diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0d7bd5bf45d7..835f6582e2f0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -159,7 +159,8 @@ rejected, %% used by "one shot RPC" (amq. reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()}, - delivery_flow, %% Deprecated since removal of CMQ in 4.0 + %% see rabbitmq-server#114 + delivery_flow :: flow | noflow, interceptor_state, queue_states, tick_timer, @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, false) of + true -> flow; + false -> noflow + end, {ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), Limiter0 = rabbit_limiter:new(LimiterPid), Global = Global0 andalso is_global_qos_permitted(), @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rejected = [], confirmed = [], reply_consumer = none, + delivery_flow = Flow, interceptor_state = undefined, queue_states = rabbit_queue_type:init() }, diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index b7ed084ac0a3..0f92f863bf6f 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) -> Confirm = MsgSeqNo /= undefined, {MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo), - Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow), case Flow of %% Here we are tracking messages sent by the rabbit_channel