Skip to content

Commit

Permalink
WIP Optionally restore credit_flow between channel -> CQ processes
Browse files Browse the repository at this point in the history
The credit_flow between publishing AMQP 0.9.1 channel and
(non-mirrored) classic queue processes was unintentionally removed in
4.0 together with anything else related to CQ mirroring.

By default we keep the 4.0 behaviour of no flow-control, but with the
new env `classic_queue_flow_control` it is possible to restore the 3.x
behaviour for non-mirored classic queues.
  • Loading branch information
gomoripeti committed Dec 3, 2024
1 parent bdb8ff2 commit e80ec7d
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
rejected,
%% used by "one shot RPC" (amq.
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
delivery_flow, %% Deprecated since removal of CMQ in 4.0
%% see rabbitmq-server#114
delivery_flow :: flow | noflow,
interceptor_state,
queue_states,
tick_timer,
Expand Down Expand Up @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, false) of
true -> flow;
false -> noflow
end,
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
Limiter0 = rabbit_limiter:new(LimiterPid),
Global = Global0 andalso is_global_qos_permitted(),
Expand Down Expand Up @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rejected = [],
confirmed = [],
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
},
Expand Down

0 comments on commit e80ec7d

Please sign in to comment.