Skip to content

Commit

Permalink
Exclude policy_repair QQ test on mixed versions
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Nov 5, 2024
1 parent 033a793 commit 4819801
Showing 1 changed file with 149 additions and 144 deletions.
293 changes: 149 additions & 144 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1307,154 +1307,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
% that affects such queue, when the process is made available again, the policy
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)
policy_repair(Config) ->
[Server0, _Server1, _Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),

QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
ExpectedMaxLength1 = 10,
Priority1 = 1,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
QueryFun = fun rabbit_fifo:overview/1,
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength1 + some messages but after consuming all messages only
% MaxLength1 are retrieved.
% Checking twice to ensure consistency
publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
% +1 because QQs let one pass
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Priority2 = 2,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength2 + some messages but after consuming all messages only
% MaxLength2 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Ensure the queue process is unavailable
lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers),

% Add policy with higher priority, allowing even more messages.
ExpectedMaxLength3 = 30,
Priority3 = 3,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Restart the queue process.
{ok, Queue} =
rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_amqqueue,
lookup,
[{resource, <<"/">>, queue, QQ}]),
lists:foreach(
fun(Srv) ->
rabbit_ct_broker_helpers:rpc(
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "Should not run in mixed version environments"};
_ ->
[Server0, _Server1, _Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),

QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
ExpectedMaxLength1 = 10,
Priority1 = 1,
ok = rabbit_ct_broker_helpers:rpc(
Config,
Srv,
rabbit_quorum_queue,
recover,
[foo, [Queue]]
)
end,
Servers),

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
QueryFun = fun rabbit_fifo:overview/1,
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength1 + some messages but after consuming all messages only
% MaxLength1 are retrieved.
% Checking twice to ensure consistency
publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
% +1 because QQs let one pass
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Priority2 = 2,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength2 + some messages but after consuming all messages only
% MaxLength2 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Ensure the queue process is unavailable
lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers),

% Add policy with higher priority, allowing even more messages.
ExpectedMaxLength3 = 30,
Priority3 = 3,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Restart the queue process.
{ok, Queue} =
rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_amqqueue,
lookup,
[{resource, <<"/">>, queue, QQ}]),
lists:foreach(
fun(Srv) ->
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ).
rabbit_quorum_queue,
recover,
[foo, [Queue]]
)
end,
Servers),

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ)
end.


gh_12635(Config) ->
Expand Down

0 comments on commit 4819801

Please sign in to comment.