Skip to content

Commit

Permalink
Expose Stream Connection metrics via Prometheus
Browse files Browse the repository at this point in the history
Supports both per connection (detailed) and aggregated total (metrics) values.
  • Loading branch information
markus812498 authored and gomoripeti committed Nov 4, 2024
1 parent 9db4317 commit 52e63c7
Showing 1 changed file with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

%% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
{stream_publisher_metrics, [
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
]},

%% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
{stream_consumer_metrics, [
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -471,6 +485,19 @@ label(M) when is_map(M) ->
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;

label({stream_publisher, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, PubId}}) ->
%% stream_consumer_metrics {queue_id, connection_pid, subscription_id}
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
"queue=\"", (escape_label_value(Name))/binary, "\",",
"connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",",
"publisher_id=\"", (integer_to_binary(PubId))/binary, "\"">>;
label({stream_consumer, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, SubId}}) ->
%% stream_consumer_metrics {queue_id, connection_pid, subscription_id}
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
"queue=\"", (escape_label_value(Name))/binary, "\",",
"connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",",
"subscription_id=\"", (integer_to_binary(SubId))/binary, "\"">>;
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
Expand Down Expand Up @@ -578,6 +605,36 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(stream_publisher_metrics = Table, false, _) ->
RealTable = rabbit_stream_publisher_created, %% real table name
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
{T,
sum(proplists:get_value(published, Props), A1),
sum(proplists:get_value(confirmed, Props), A2),
sum(proplists:get_value(errored, Props), A3)
}
end, empty(Table), RealTable) of
{Table, A1, A2, A3} ->
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(stream_consumer_metrics = Table, false, _) ->
RealTable = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
{T,
sum(proplists:get_value(consumed, Props), A1),
sum(proplists:get_value(offset, Props), A2),
sum(proplists:get_value(offset_lag, Props), A3)
}
end, empty(Table), RealTable) of
{Table, A1, A2, A3} ->
[{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -708,6 +765,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
end, [], Table);
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(stream_publisher_metrics, true, _) ->
try
ets:select(rabbit_stream_publisher_created,
[{{'$1', '$2'}, [], [{{ {{stream_publisher, '$1'}}, '$2' }}]}])
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(stream_consumer_metrics, true, _) ->
try
ets:select(rabbit_stream_consumer_created,
[{{'$1', '$2'}, [], [{{ {{stream_consumer, '$1'}}, '$2' }}]}])
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
Expand Down Expand Up @@ -817,7 +890,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->

empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
empty(T) when T == connection_coarse_metrics;
T == stream_publisher_metrics;
T == stream_consumer_metrics;
T == auth_attempt_metrics;
T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
Expand Down

0 comments on commit 52e63c7

Please sign in to comment.