Skip to content

Commit

Permalink
Add tests for stream publisher/consumer prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gomoripeti committed Nov 4, 2024
1 parent 52e63c7 commit 142a219
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
8 changes: 7 additions & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream

EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}

Expand All @@ -26,6 +26,12 @@ endif
include ../../rabbitmq-components.mk
include ../../erlang.mk

# We are using stream_test_utils from rabbitmq_stream.
CT_OPTS += -pa ../rabbitmq_stream/test/

test-build::
$(verbose) $(MAKE) -C ../rabbitmq_stream test-dir

.PHONY: readme
readme: # Preview README & live reload on edit
@docker run --interactive --tty --rm --name changelog_md \
Expand Down
124 changes: 123 additions & 1 deletion deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-compile(export_all).

Expand Down Expand Up @@ -70,7 +71,8 @@ groups() ->
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
exchange_bindings_metric,
exchange_names_metric
exchange_names_metric,
stream_pub_sub_metrics
]},
{special_chars, [], [core_metrics_special_chars]},
{authentication, [], [basic_auth]}
Expand Down Expand Up @@ -739,6 +741,104 @@ exchange_names_metric(Config) ->
}, Names),
ok.

stream_pub_sub_metrics(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
publish_via_stream_protocol(Stream, Config),

%% wait for the stream to emit stats and the counters to become non-zero
%% (collect_statistics_interval set to 100ms in this test group)
rabbit_ct_helpers:await_condition(
fun() ->
{_, Body0} = http_get_with_pal(Config, "/metrics", [], 200),
#{rabbitmq_stream_publisher_published_total := #{undefined := [P]}}
= parse_response(Body0),
P > 0
end,
100),

%% aggregated metrics
{_, Body1} = http_get_with_pal(Config, "/metrics", [], 200),
ParsedBody1 = parse_response(Body1),
?assertEqual(#{rabbitmq_stream_publisher_published_total => #{undefined => [5]},
rabbitmq_stream_publisher_confirmed_total => #{undefined => [5]},
rabbitmq_stream_publisher_error_messages => #{undefined => [0]},
rabbitmq_stream_consumer_consumed_total => #{undefined => [2]},
rabbitmq_stream_consumer_offset => #{undefined => [0]},
rabbitmq_stream_consumer_offset_lag => #{undefined => [2]}
},
maps:with([rabbitmq_stream_publisher_published_total,
rabbitmq_stream_publisher_confirmed_total,
rabbitmq_stream_publisher_error_messages,
rabbitmq_stream_consumer_consumed_total,
rabbitmq_stream_consumer_offset,
rabbitmq_stream_consumer_offset_lag],
ParsedBody1)),

%% per-object metrics
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_publisher_metrics",
[], 200),
ParsedBody2 = parse_response(Body2),

#{rabbitmq_detailed_stream_publisher_published_total := Published} = ParsedBody2,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "98",
connection := _}, [2]},
{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "99",
connection := _}, [3]}],
lists:sort(maps:to_list(Published))),

#{rabbitmq_detailed_stream_publisher_confirmed_total := PubConfirmed} = ParsedBody2,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "98",
connection := _}, [2]},
{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "99",
connection := _}, [3]}],
lists:sort(maps:to_list(PubConfirmed))),

#{rabbitmq_detailed_stream_publisher_error_messages := PubError} = ParsedBody2,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "98",
connection := _}, [0]},
{#{vhost := "/",
queue := "stream_pub_sub_metrics",
publisher_id := "99",
connection := _}, [0]}],
lists:sort(maps:to_list(PubError))),

{_, Body3} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
[], 200),
ParsedBody3 = parse_response(Body3),

#{rabbitmq_detailed_stream_consumer_consumed_total := Consumed} = ParsedBody3,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
subscription_id := "97",
connection := _}, [2]}],
lists:sort(maps:to_list(Consumed))),

#{rabbitmq_detailed_stream_consumer_offset := Offset} = ParsedBody3,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
subscription_id := "97",
connection := _}, [0]}],
lists:sort(maps:to_list(Offset))),

#{rabbitmq_detailed_stream_consumer_offset_lag := OffsetLag} = ParsedBody3,
?assertMatch([{#{vhost := "/",
queue := "stream_pub_sub_metrics",
subscription_id := "97",
connection := _}, [2]}],
lists:sort(maps:to_list(OffsetLag))),

ok.

core_metrics_special_chars(Config) ->
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200),
?assertMatch(#{rabbitmq_detailed_queue_messages :=
Expand Down Expand Up @@ -784,6 +884,28 @@ basic_auth(Config) ->
rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>),
rabbit_ct_broker_helpers:delete_user(Config, <<"management">>).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------

publish_via_stream_protocol(Stream, Config) ->
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 98,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
Payloads = [<<"m1">>, <<"m2">>],
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, _SequenceFrom0 = 1, Payloads),

PublisherId2 = 99,
{ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2),
Payloads2 = [<<"m3">>, <<"m4">>, <<"m5">>],
{ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, _SequenceFrom1 = 3, Payloads2),

SubscriptionId = 97,
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
%% delivery of <<"m1">> and <<"m2">>
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
ok.

http_get(Config, ReqHeaders, CodeExp) ->
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
Expand Down

0 comments on commit 142a219

Please sign in to comment.