Skip to content

Commit

Permalink
WIP Optionally restore credit_flow between channel -> CQ processes
Browse files Browse the repository at this point in the history
The credit_flow between publishing AMQP 0.9.1 channel and
(non-mirrored) classic queue processes was unintentionally removed in
4.0 together with anything else related to CQ mirroring.

By default we keep the 4.0 behaviour of no flow-control, but with the
new env `classic_queue_flow_control` it is possible to restore the 3.x
behaviour for non-mirored classic queues.
  • Loading branch information
gomoripeti committed Dec 5, 2024
1 parent bdb8ff2 commit 61a8cc8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
rejected,
%% used by "one shot RPC" (amq.
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
delivery_flow, %% Deprecated since removal of CMQ in 4.0
%% see rabbitmq-server#114
delivery_flow :: flow | noflow,
interceptor_state,
queue_states,
tick_timer,
Expand Down Expand Up @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, false) of
true -> flow;
false -> noflow
end,
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
Limiter0 = rabbit_limiter:new(LimiterPid),
Global = Global0 andalso is_global_qos_permitted(),
Expand Down Expand Up @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rejected = [],
confirmed = [],
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) ->
Confirm = MsgSeqNo /= undefined,

{MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow),

case Flow of
%% Here we are tracking messages sent by the rabbit_channel
Expand Down

0 comments on commit 61a8cc8

Please sign in to comment.