diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 10c3a8a02ad1..8cdb18dc045c 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1307,154 +1307,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> % that affects such queue, when the process is made available again, the policy % will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863) policy_repair(Config) -> - [Server0, _Server1, _Server2] = Servers = - rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), - #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - ExpectedMaxLength1 = 10, - Priority1 = 1, - ok = rabbit_ct_broker_helpers:rpc( - Config, - 0, - rabbit_policy, - set, - [ - <<"/">>, - <>, - QQ, - [{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}], - Priority1, - <<"quorum_queues">>, - <<"acting-user">> - ]), - - % Wait for the policy to apply - QueryFun = fun rabbit_fifo:overview/1, - ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _}, - rpc:call(Server0, ra, local_query, [RaName, QueryFun]), - ?DEFAULT_AWAIT), - - % Check the policy has been applied - % Insert MaxLength1 + some messages but after consuming all messages only - % MaxLength1 are retrieved. - % Checking twice to ensure consistency - publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1), - % +1 because QQs let one pass - wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1), - fail = publish_confirm(Ch, QQ), - fail = publish_confirm(Ch, QQ), - consume_all(Ch, QQ), - - % Set higher priority policy, allowing more messages - ExpectedMaxLength2 = 20, - Priority2 = 2, - ok = rabbit_ct_broker_helpers:rpc( - Config, - 0, - rabbit_policy, - set, - [ - <<"/">>, - <>, - QQ, - [{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}], - Priority2, - <<"quorum_queues">>, - <<"acting-user">> - ]), - - % Wait for the policy to apply - ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _}, - rpc:call(Server0, ra, local_query, [RaName, QueryFun]), - ?DEFAULT_AWAIT), - - % Check the policy has been applied - % Insert MaxLength2 + some messages but after consuming all messages only - % MaxLength2 are retrieved. - % Checking twice to ensure consistency. - % + 1 because QQs let one pass - publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1), - wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1), - fail = publish_confirm(Ch, QQ), - fail = publish_confirm(Ch, QQ), - consume_all(Ch, QQ), - - % Ensure the queue process is unavailable - lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers), - - % Add policy with higher priority, allowing even more messages. - ExpectedMaxLength3 = 30, - Priority3 = 3, - ok = rabbit_ct_broker_helpers:rpc( - Config, - 0, - rabbit_policy, - set, - [ - <<"/">>, - <>, - QQ, - [{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}], - Priority3, - <<"quorum_queues">>, - <<"acting-user">> - ]), - - % Restart the queue process. - {ok, Queue} = - rabbit_ct_broker_helpers:rpc( - Config, - 0, - rabbit_amqqueue, - lookup, - [{resource, <<"/">>, queue, QQ}]), - lists:foreach( - fun(Srv) -> - rabbit_ct_broker_helpers:rpc( + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "Should not run in mixed version environments"}; + _ -> + [Server0, _Server1, _Server2] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + ExpectedMaxLength1 = 10, + Priority1 = 1, + ok = rabbit_ct_broker_helpers:rpc( Config, - Srv, - rabbit_quorum_queue, - recover, - [foo, [Queue]] - ) - end, - Servers), - - % Wait for the queue to be available again. - lists:foreach(fun(Srv) -> - rabbit_ct_helpers:await_condition( - fun () -> - is_pid( + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}], + Priority1, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Wait for the policy to apply + QueryFun = fun rabbit_fifo:overview/1, + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength1 + some messages but after consuming all messages only + % MaxLength1 are retrieved. + % Checking twice to ensure consistency + publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1), + % +1 because QQs let one pass + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ), + + % Set higher priority policy, allowing more messages + ExpectedMaxLength2 = 20, + Priority2 = 2, + ok = rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}], + Priority2, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Wait for the policy to apply + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength2 + some messages but after consuming all messages only + % MaxLength2 are retrieved. + % Checking twice to ensure consistency. + % + 1 because QQs let one pass + publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1), + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ), + + % Ensure the queue process is unavailable + lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers), + + % Add policy with higher priority, allowing even more messages. + ExpectedMaxLength3 = 30, + Priority3 = 3, + ok = rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}], + Priority3, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Restart the queue process. + {ok, Queue} = + rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_amqqueue, + lookup, + [{resource, <<"/">>, queue, QQ}]), + lists:foreach( + fun(Srv) -> rabbit_ct_broker_helpers:rpc( Config, Srv, - erlang, - whereis, - [RaName])) - end) - end, - Servers), - - % Wait for the policy to apply - ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _}, - rpc:call(Server0, ra, local_query, [RaName, QueryFun]), - ?DEFAULT_AWAIT), - - % Check the policy has been applied - % Insert MaxLength3 + some messages but after consuming all messages only - % MaxLength3 are retrieved. - % Checking twice to ensure consistency. - % + 1 because QQs let one pass - publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1), - wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1), - fail = publish_confirm(Ch, QQ), - fail = publish_confirm(Ch, QQ), - consume_all(Ch, QQ). + rabbit_quorum_queue, + recover, + [foo, [Queue]] + ) + end, + Servers), + + % Wait for the queue to be available again. + lists:foreach(fun(Srv) -> + rabbit_ct_helpers:await_condition( + fun () -> + is_pid( + rabbit_ct_broker_helpers:rpc( + Config, + Srv, + erlang, + whereis, + [RaName])) + end) + end, + Servers), + + % Wait for the policy to apply + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength3 + some messages but after consuming all messages only + % MaxLength3 are retrieved. + % Checking twice to ensure consistency. + % + 1 because QQs let one pass + publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1), + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ) + end. gh_12635(Config) ->