From 6d780789b6a9e69dfe73fa9690d3d20a72f066f0 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Mon, 30 Sep 2024 13:26:25 +0200 Subject: [PATCH] Consider QQs may let pass 1st overflowing msg --- deps/rabbit/test/quorum_queue_SUITE.erl | 82 +++++++++++++++---------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 555eb479c8b0..e195594ecca4 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1176,7 +1176,7 @@ policy_repair(Config) -> <<"/">>, <>, QQ, - [{<<"max-length">>, ExpectedMaxLength1}], + [{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}], Priority1, <<"quorum_queues">>, <<"acting-user">> @@ -1194,13 +1194,17 @@ policy_repair(Config) -> % Checking twice to ensure consistency % % Once - publish_many(Ch, QQ, ExpectedMaxLength1 + 1, call), - timer:sleep(100), - ExpectedMaxLength1 = length(consume_all(Ch, QQ)), + {GottenOks1, GottenFails1} = publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1), + ct:pal("GottenOks1: ~p, GottenFails1: ~p", [GottenOks1, GottenFails1]), + ?assert((GottenOks1 =:= ExpectedMaxLength1) or (GottenOks1 =:= ExpectedMaxLength1 + 1)), + ?assert((GottenFails1 =:= 1) or (GottenFails1 =:= 0)), + consume_all(Ch, QQ), % Twice - publish_many(Ch, QQ, ExpectedMaxLength1 + 10, call), - timer:sleep(100), - ExpectedMaxLength1 = length(consume_all(Ch, QQ)), + {GottenOks2, GottenFails2} = publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 10), + ct:pal("GottenOks2: ~p, GottenFails2: ~p", [GottenOks2, GottenFails2]), + ?assert((GottenOks2 =:= ExpectedMaxLength1) or (GottenOks2 =:= ExpectedMaxLength1 + 1)), + ?assert((GottenFails2 =:= 10) or (GottenFails2 =:= 9)), + consume_all(Ch, QQ), % Set higher priority policy, allowing more messages ExpectedMaxLength2 = 20, @@ -1214,7 +1218,7 @@ policy_repair(Config) -> <<"/">>, <>, QQ, - [{<<"max-length">>, ExpectedMaxLength2}], + [{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}], Priority2, <<"quorum_queues">>, <<"acting-user">> @@ -1231,13 +1235,17 @@ policy_repair(Config) -> % Checking twice to ensure consistency. % % Once - publish_many(Ch, QQ, ExpectedMaxLength2 + 1), - timer:sleep(100), - ExpectedMaxLength2 = length(consume_all(Ch, QQ)), + {GottenOks3, GottenFails3} = publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1), + ct:pal("GottenOks3: ~p, GottenFails3: ~p", [GottenOks3, GottenFails3]), + ?assert((GottenOks3 =:= ExpectedMaxLength2) or (GottenOks3 =:= ExpectedMaxLength2 + 1)), + ?assert((GottenFails3 =:= 1) or (GottenFails3 =:= 0)), + consume_all(Ch, QQ), % Twice - publish_many(Ch, QQ, ExpectedMaxLength2 + 10), - timer:sleep(100), - ExpectedMaxLength2 = length(consume_all(Ch, QQ)), + {GottenOks4, GottenFails4} = publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 10), + ct:pal("GottenOks4: ~p, GottenFails4: ~p", [GottenOks4, GottenFails4]), + ?assert((GottenOks4 =:= ExpectedMaxLength2) or (GottenOks4 =:= ExpectedMaxLength2 + 1)), + ?assert((GottenFails4 =:= 10) or (GottenFails4 =:= 9)), + consume_all(Ch, QQ), % Make the queue process unavailable. % Kill the process multiple times until its supervisor stops restarting it. @@ -1282,7 +1290,7 @@ policy_repair(Config) -> <<"/">>, <>, QQ, - [{<<"max-length">>, ExpectedMaxLength3}], + [{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}], Priority3, <<"quorum_queues">>, <<"acting-user">> @@ -1329,8 +1337,11 @@ policy_repair(Config) -> GetPidUntil() end, Servers), - - timer:sleep(1000), + + % Wait for the policy to apply + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), % Check the policy has been applied % Insert MaxLength3 + some messages but after consuming all messages only @@ -1338,13 +1349,16 @@ policy_repair(Config) -> % Checking twice to ensure consistency. % % Once - publish_many(Ch, QQ, ExpectedMaxLength3 + 1, call), - timer:sleep(100), - ExpectedMaxLength3 = length(consume_all(Ch, QQ)), + {GottenOks5, GottenFails5} = publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1), + ct:pal("GottenOks5: ~p, GottenFails5: ~p", [GottenOks5, GottenFails5]), + ?assert((GottenOks5 =:= ExpectedMaxLength3) or (GottenOks5 =:= ExpectedMaxLength3 + 1)), + ?assert((GottenFails5 =:= 1) or (GottenFails5 =:= 0)), + consume_all(Ch, QQ), % Twice - publish_many(Ch, QQ, ExpectedMaxLength3 + 10, call), - timer:sleep(100), - ExpectedMaxLength3 = length(consume_all(Ch, QQ)). + {GottenOks6, GottenFails6} = publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 10), + ct:pal("GottenOks6: ~p, GottenFails6: ~p", [GottenOks6, GottenFails6]), + ?assert((GottenOks6 =:= ExpectedMaxLength3) or (GottenOks6 =:= ExpectedMaxLength3 + 1)), + ?assert((GottenFails6 =:= 10) or (GottenFails6 =:= 9)). priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority @@ -4183,19 +4197,13 @@ count_online_nodes(Server, VHost, Q0) -> length(proplists:get_value(online, Info, [])). publish_many(Ch, Queue, Count) -> - publish_many(Ch, Queue, Count, cast). - -publish_many(Ch, Queue, Count, Method) -> - [publish(Ch, Queue, Method) || _ <- lists:seq(1, Count)]. + [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. publish(Ch, Queue) -> - publish(Ch, Queue, cast). + publish(Ch, Queue, <<"msg">>). -publish(Ch, Queue, Method) -> - publish(Ch, Queue, <<"msg">>, Method). - -publish(Ch, Queue, Msg, Method) when Method =:= cast; Method =:= call -> - ok = amqp_channel:Method(Ch, +publish(Ch, Queue, Msg) -> + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = Msg}). @@ -4354,6 +4362,14 @@ lists_interleave([Item | Items], List) {Left, Right} = lists:split(2, List), Left ++ [Item | lists_interleave(Items, Right)]. +publish_confirm_many(Ch, Queue, Count) -> + lists:foldl(fun(_, {Oks, Fails}) -> + case publish_confirm(Ch, Queue) of + ok -> {Oks + 1, Fails}; + _ -> {Oks, Fails + 1} + end + end, {0,0}, lists:seq(1, Count)). + consume_all(Ch, QQ) -> Consume = fun C(Acc) -> case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of