diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index c6dfd43e2ffa..35328449c9f1 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -200,6 +200,20 @@ {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"} ]}, + %% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created. + {stream_publisher_metrics, [ + {2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published}, + {2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed}, + {2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored} + ]}, + + %% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created. + {stream_consumer_metrics, [ + {2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed}, + {2, undefined, stream_consumer_offset, counter, "Total offset", offset}, + {2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag} + ]}, + {connection_metrics, [ {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt}, {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt}, @@ -471,6 +485,19 @@ label(M) when is_map(M) -> label(#resource{virtual_host = VHost, kind = exchange, name = Name}) -> <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", "exchange=\"", (escape_label_value(Name))/binary, "\"">>; + +label({stream_publisher, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, PubId}}) -> + %% stream_consumer_metrics {queue_id, connection_pid, subscription_id} + <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", + "queue=\"", (escape_label_value(Name))/binary, "\",", + "connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",", + "publisher_id=\"", (integer_to_binary(PubId))/binary, "\"">>; +label({stream_consumer, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, SubId}}) -> + %% stream_consumer_metrics {queue_id, connection_pid, subscription_id} + <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", + "queue=\"", (escape_label_value(Name))/binary, "\",", + "connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",", + "subscription_id=\"", (integer_to_binary(SubId))/binary, "\"">>; label(#resource{virtual_host = VHost, kind = queue, name = Name}) -> <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", "queue=\"", (escape_label_value(Name))/binary, "\"">>; @@ -578,6 +605,36 @@ get_data(channel_metrics = Table, false, _) -> [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; +get_data(stream_publisher_metrics = Table, false, _) -> + RealTable = rabbit_stream_publisher_created, %% real table name + try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) -> + {T, + sum(proplists:get_value(published, Props), A1), + sum(proplists:get_value(confirmed, Props), A2), + sum(proplists:get_value(errored, Props), A3) + } + end, empty(Table), RealTable) of + {Table, A1, A2, A3} -> + [{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}] + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; +get_data(stream_consumer_metrics = Table, false, _) -> + RealTable = rabbit_stream_consumer_created, %% real table name + try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) -> + {T, + sum(proplists:get_value(consumed, Props), A1), + sum(proplists:get_value(offset, Props), A2), + sum(proplists:get_value(offset_lag, Props), A3) + } + end, empty(Table), RealTable) of + {Table, A1, A2, A3} -> + [{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}] + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; get_data(queue_consumer_count = MF, false, VHostsFilter) -> Table = queue_metrics, %% Real table name {_, A1} = ets:foldl(fun @@ -708,6 +765,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics end, [], Table); get_data(queue_consumer_count, true, _) -> ets:tab2list(queue_metrics); +get_data(stream_publisher_metrics, true, _) -> + try + ets:select(rabbit_stream_publisher_created, + [{{'$1', '$2'}, [], [{{ {{stream_publisher, '$1'}}, '$2' }}]}]) + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; +get_data(stream_consumer_metrics, true, _) -> + try + ets:select(rabbit_stream_consumer_created, + [{{'$1', '$2'}, [], [{{ {{stream_consumer, '$1'}}, '$2' }}]}]) + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; get_data(vhost_status, _, _) -> [ { #{<<"vhost">> => VHost}, case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of @@ -817,7 +890,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) -> empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; -empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> +empty(T) when T == connection_coarse_metrics; + T == stream_publisher_metrics; + T == stream_consumer_metrics; + T == auth_attempt_metrics; + T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> {T, 0, 0, 0, 0};