Skip to content

Commit

Permalink
Merge pull request rabbitmq#9953 from rabbitmq/fix-amqp-connection-block
Browse files Browse the repository at this point in the history
Fix AMQP 1.0 connection throttling
  • Loading branch information
michaelklishin authored Nov 17, 2023
2 parents f118d2f + 5a870df commit 232069f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
2 changes: 1 addition & 1 deletion deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ handle_1_0_session_frame(Channel, Frame, State) ->
case Frame of
#'v1_0.end'{} ->
untrack_channel(Channel, State);
#'v1_0.transfer'{} ->
{#'v1_0.transfer'{}, _MsgPart} ->
case (State#v1.connection_state =:= blocking) of
true ->
ok = rabbit_heartbeat:pause_monitor(
Expand Down
57 changes: 55 additions & 2 deletions deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ groups() ->
roundtrip_quorum_queue_with_drain,
roundtrip_stream_queue_with_drain,
amqp_stream_amqpl,
message_headers_conversion
message_headers_conversion,
resource_alarm
]},
{metrics, [], [
auth_attempt_metrics
Expand Down Expand Up @@ -385,6 +386,58 @@ message_headers_conversion(Config) ->
ok = amqp10_client:close_connection(Connection),
ok.

resource_alarm(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),

OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME),
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = create_amqp10_sender(Session, Address),

M1 = amqp10_msg:new(<<"t1">>, <<"m1">>, false),
M2 = amqp10_msg:new(<<"t2">>, <<"m2">>, false),
M3 = amqp10_msg:new(<<"t3">>, <<"m3">>, false),
M4 = amqp10_msg:new(<<"t4">>, <<"m4">>, false),

ok = amqp10_client:send_msg(Sender, M1),
ok = wait_for_settlement(<<"t1">>),

ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
%% Let connection block.
timer:sleep(100),

ok = amqp10_client:send_msg(Sender, M2),
ok = amqp10_client:send_msg(Sender, M3),
ok = amqp10_client:send_msg(Sender, M4),

%% M2 still goes through, but M3 should get blocked. (Server is off by one message.)
receive {amqp10_disposition, {accepted, <<"t2">>}} -> ok
after 300 -> ct:fail({accepted_timeout, ?LINE})
end,
receive {amqp10_disposition, {accepted, <<"t3">>}} -> ct:fail("expected connection to be blocked")
after 300 -> ok
end,

%% Unblock connection.
ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),

%% Previously sent M3 and M4 should now be processed on the server.
receive {amqp10_disposition, {accepted, <<"t3">>}} -> ok
after 5000 -> ct:fail({accepted_timeout, ?LINE})
end,
ok = wait_for_settlement(<<"t4">>),

delete_queue(Config, QName),
ok = amqp10_client:close_connection(Connection).

amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
{ok, Sender} = create_amqp10_sender(Session, Address),

Expand Down Expand Up @@ -507,7 +560,7 @@ wait_for_settlement(Tag, State) ->
ok
after 5000 ->
flush("wait_for_settlement timed out"),
ct:fail(settled_timeout)
ct:fail({settled_timeout, Tag})
end.

wait_for_accepts(0) -> ok;
Expand Down

0 comments on commit 232069f

Please sign in to comment.