From 61a8cc8b7616a4fa650201592dbdcb1a964d5f15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Tue, 3 Dec 2024 17:34:32 +0100 Subject: [PATCH] WIP Optionally restore credit_flow between channel -> CQ processes The credit_flow between publishing AMQP 0.9.1 channel and (non-mirrored) classic queue processes was unintentionally removed in 4.0 together with anything else related to CQ mirroring. By default we keep the 4.0 behaviour of no flow-control, but with the new env `classic_queue_flow_control` it is possible to restore the 3.x behaviour for non-mirored classic queues. --- deps/rabbit/src/rabbit_channel.erl | 8 +++++++- deps/rabbit/src/rabbit_classic_queue.erl | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0d7bd5bf45d7..835f6582e2f0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -159,7 +159,8 @@ rejected, %% used by "one shot RPC" (amq. reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()}, - delivery_flow, %% Deprecated since removal of CMQ in 4.0 + %% see rabbitmq-server#114 + delivery_flow :: flow | noflow, interceptor_state, queue_states, tick_timer, @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, false) of + true -> flow; + false -> noflow + end, {ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), Limiter0 = rabbit_limiter:new(LimiterPid), Global = Global0 andalso is_global_qos_permitted(), @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rejected = [], confirmed = [], reply_consumer = none, + delivery_flow = Flow, interceptor_state = undefined, queue_states = rabbit_queue_type:init() }, diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index b7ed084ac0a3..0f92f863bf6f 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) -> Confirm = MsgSeqNo /= undefined, {MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo), - Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow), case Flow of %% Here we are tracking messages sent by the rabbit_channel