diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl index be985e2adc6e..5f4b1e591f2c 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl @@ -459,7 +459,7 @@ handle_1_0_session_frame(Channel, Frame, State) -> case Frame of #'v1_0.end'{} -> untrack_channel(Channel, State); - #'v1_0.transfer'{} -> + {#'v1_0.transfer'{}, _MsgPart} -> case (State#v1.connection_state =:= blocking) of true -> ok = rabbit_heartbeat:pause_monitor( diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 72d5c5f2e8b7..849cec47e680 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -30,7 +30,8 @@ groups() -> roundtrip_quorum_queue_with_drain, roundtrip_stream_queue_with_drain, amqp_stream_amqpl, - message_headers_conversion + message_headers_conversion, + resource_alarm ]}, {metrics, [], [ auth_attempt_metrics @@ -385,6 +386,58 @@ message_headers_conversion(Config) -> ok = amqp10_client:close_connection(Connection), ok. +resource_alarm(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + QName = atom_to_binary(?FUNCTION_NAME, utf8), + Address = <<"/amq/queue/", QName/binary>>, + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + + OpnConf = #{address => Host, + port => Port, + container_id => atom_to_binary(?FUNCTION_NAME), + sasl => {plain, <<"guest">>, <<"guest">>}}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + {ok, Sender} = create_amqp10_sender(Session, Address), + + M1 = amqp10_msg:new(<<"t1">>, <<"m1">>, false), + M2 = amqp10_msg:new(<<"t2">>, <<"m2">>, false), + M3 = amqp10_msg:new(<<"t3">>, <<"m3">>, false), + M4 = amqp10_msg:new(<<"t4">>, <<"m4">>, false), + + ok = amqp10_client:send_msg(Sender, M1), + ok = wait_for_settlement(<<"t1">>), + + ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]), + %% Let connection block. + timer:sleep(100), + + ok = amqp10_client:send_msg(Sender, M2), + ok = amqp10_client:send_msg(Sender, M3), + ok = amqp10_client:send_msg(Sender, M4), + + %% M2 still goes through, but M3 should get blocked. (Server is off by one message.) + receive {amqp10_disposition, {accepted, <<"t2">>}} -> ok + after 300 -> ct:fail({accepted_timeout, ?LINE}) + end, + receive {amqp10_disposition, {accepted, <<"t3">>}} -> ct:fail("expected connection to be blocked") + after 300 -> ok + end, + + %% Unblock connection. + ok = rabbit_ct_broker_helpers:rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]), + + %% Previously sent M3 and M4 should now be processed on the server. + receive {amqp10_disposition, {accepted, <<"t3">>}} -> ok + after 5000 -> ct:fail({accepted_timeout, ?LINE}) + end, + ok = wait_for_settlement(<<"t4">>), + + delete_queue(Config, QName), + ok = amqp10_client:close_connection(Connection). + amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> {ok, Sender} = create_amqp10_sender(Session, Address), @@ -507,7 +560,7 @@ wait_for_settlement(Tag, State) -> ok after 5000 -> flush("wait_for_settlement timed out"), - ct:fail(settled_timeout) + ct:fail({settled_timeout, Tag}) end. wait_for_accepts(0) -> ok;