diff --git a/deps/rabbitmq_prometheus/Makefile b/deps/rabbitmq_prometheus/Makefile index 8380e81b9a7b..d4605a4368ec 100644 --- a/deps/rabbitmq_prometheus/Makefile +++ b/deps/rabbitmq_prometheus/Makefile @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ PROJECT_MOD := rabbit_prometheus_app DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch BUILD_DEPS = amqp_client rabbit_common rabbitmq_management -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}} @@ -26,6 +26,12 @@ endif include ../../rabbitmq-components.mk include ../../erlang.mk +# We are using stream_test_utils from rabbitmq_stream. +CT_OPTS += -pa ../rabbitmq_stream/test/ + +test-build:: + $(verbose) $(MAKE) -C ../rabbitmq_stream test-dir + .PHONY: readme readme: # Preview README & live reload on edit @docker run --interactive --tty --rm --name changelog_md \ diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index cd66b0e226be..1c47a68c5cfb 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -compile(export_all). @@ -70,7 +71,8 @@ groups() -> queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, exchange_bindings_metric, - exchange_names_metric + exchange_names_metric, + stream_pub_sub_metrics ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -739,6 +741,104 @@ exchange_names_metric(Config) -> }, Names), ok. +stream_pub_sub_metrics(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + publish_via_stream_protocol(Stream, Config), + + %% wait for the stream to emit stats and the counters to become non-zero + %% (collect_statistics_interval set to 100ms in this test group) + rabbit_ct_helpers:await_condition( + fun() -> + {_, Body0} = http_get_with_pal(Config, "/metrics", [], 200), + #{rabbitmq_stream_publisher_published_total := #{undefined := [P]}} + = parse_response(Body0), + P > 0 + end, + 100), + + %% aggregated metrics + {_, Body1} = http_get_with_pal(Config, "/metrics", [], 200), + ParsedBody1 = parse_response(Body1), + ?assertEqual(#{rabbitmq_stream_publisher_published_total => #{undefined => [5]}, + rabbitmq_stream_publisher_confirmed_total => #{undefined => [5]}, + rabbitmq_stream_publisher_error_messages => #{undefined => [0]}, + rabbitmq_stream_consumer_consumed_total => #{undefined => [2]}, + rabbitmq_stream_consumer_offset => #{undefined => [0]}, + rabbitmq_stream_consumer_offset_lag => #{undefined => [2]} + }, + maps:with([rabbitmq_stream_publisher_published_total, + rabbitmq_stream_publisher_confirmed_total, + rabbitmq_stream_publisher_error_messages, + rabbitmq_stream_consumer_consumed_total, + rabbitmq_stream_consumer_offset, + rabbitmq_stream_consumer_offset_lag], + ParsedBody1)), + + %% per-object metrics + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_publisher_metrics", + [], 200), + ParsedBody2 = parse_response(Body2), + + #{rabbitmq_detailed_stream_publisher_published_total := Published} = ParsedBody2, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "98", + connection := _}, [2]}, + {#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "99", + connection := _}, [3]}], + lists:sort(maps:to_list(Published))), + + #{rabbitmq_detailed_stream_publisher_confirmed_total := PubConfirmed} = ParsedBody2, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "98", + connection := _}, [2]}, + {#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "99", + connection := _}, [3]}], + lists:sort(maps:to_list(PubConfirmed))), + + #{rabbitmq_detailed_stream_publisher_error_messages := PubError} = ParsedBody2, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "98", + connection := _}, [0]}, + {#{vhost := "/", + queue := "stream_pub_sub_metrics", + publisher_id := "99", + connection := _}, [0]}], + lists:sort(maps:to_list(PubError))), + + {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", + [], 200), + ParsedBody3 = parse_response(Body3), + + #{rabbitmq_detailed_stream_consumer_consumed_total := Consumed} = ParsedBody3, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + subscription_id := "97", + connection := _}, [2]}], + lists:sort(maps:to_list(Consumed))), + + #{rabbitmq_detailed_stream_consumer_offset := Offset} = ParsedBody3, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + subscription_id := "97", + connection := _}, [0]}], + lists:sort(maps:to_list(Offset))), + + #{rabbitmq_detailed_stream_consumer_offset_lag := OffsetLag} = ParsedBody3, + ?assertMatch([{#{vhost := "/", + queue := "stream_pub_sub_metrics", + subscription_id := "97", + connection := _}, [2]}], + lists:sort(maps:to_list(OffsetLag))), + + ok. + core_metrics_special_chars(Config) -> {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), ?assertMatch(#{rabbitmq_detailed_queue_messages := @@ -784,6 +884,28 @@ basic_auth(Config) -> rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>), rabbit_ct_broker_helpers:delete_user(Config, <<"management">>). +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +publish_via_stream_protocol(Stream, Config) -> + {ok, S, C0} = stream_test_utils:connect(Config, 0), + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), + PublisherId = 98, + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), + Payloads = [<<"m1">>, <<"m2">>], + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, _SequenceFrom0 = 1, Payloads), + + PublisherId2 = 99, + {ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2), + Payloads2 = [<<"m3">>, <<"m4">>, <<"m5">>], + {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, _SequenceFrom1 = 3, Payloads2), + + SubscriptionId = 97, + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), + %% delivery of <<"m1">> and <<"m2">> + {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), + ok. http_get(Config, ReqHeaders, CodeExp) -> Path = proplists:get_value(prometheus_path, Config, "/metrics"),