Skip to content

Commit

Permalink
feat: Add channel indep. queue & exchange metrics
Browse files Browse the repository at this point in the history
Add channel independent queue & exchange metrics.
  • Loading branch information
LoisSotoLopez committed May 23, 2024
1 parent eed86cb commit a9ae940
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 19 deletions.
3 changes: 3 additions & 0 deletions deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
{channel_exchange_metrics, set},
{channel_process_metrics, set},
{consumer_created, set},
{exchange_metrics, set},
{queue_metrics, set},
{queue_counter_metrics, set},
{queue_coarse_metrics, set},
{queue_exchange_metrics, set},
{node_persister_metrics, set},
{node_coarse_metrics, set},
{node_metrics, set},
Expand Down
36 changes: 24 additions & 12 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,53 +166,65 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.

channel_stats(exchange_stats, publish, Id, Value) ->
channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, Id, Value) ->
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_exchange_stats, publish, Id, Value) ->
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
ok;
channel_stats(queue_stats, get, Id, Value) ->
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_no_ack, Id, Value) ->
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver, Id, Value) ->
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, redeliver, Id, Value) ->
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, ack, Id, Value) ->
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_empty, Id, Value) ->
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok.

delete(Table, Key) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ retention_policy(channel_queue_metrics) -> detailed;
retention_policy(channel_process_metrics) -> basic;
retention_policy(consumer_created) -> basic;
retention_policy(queue_metrics) -> basic;
retention_policy(queue_counter_metrics) -> detailed;
retention_policy(queue_coarse_metrics) -> basic;
retention_policy(exchange_metrics) -> detailed;
retention_policy(queue_exchange_metrics) -> detailed;
retention_policy(node_persister_metrics) -> global;
retention_policy(node_coarse_metrics) -> global;
retention_policy(node_metrics) -> basic;
Expand Down Expand Up @@ -276,6 +279,10 @@ aggregate_entry({{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutab
Ops2
end,
{insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
aggregate_entry({_E, _Publish0, _Confirm, _ReturnUnroutable, _DropUnroutable, _N}, NextStats, Ops0,
#state{table = exchange_metrics} = State) ->
% Do nothing since it's already done by the channel aggregate entry below
{NextStats, Ops0, State};
aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutable, 1},
NextStats, Ops0,
#state{table = channel_exchange_metrics,
Expand All @@ -295,6 +302,11 @@ aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnrouta
end,
rabbit_core_metrics:delete(channel_exchange_metrics, Id),
{NextStats, Ops2, State};
aggregate_entry({_Q, _Get, _GetNoAck, _Deliver, _DeliverNoAck,
_Redeliver, _Ack, _GetEmpty, _N}, NextStats, Ops0,
#state{table = queue_counter_metrics} = State) ->
% Do nothing since it's already done by the channel aggregate entry below
{NextStats, Ops0, State};
aggregate_entry({{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack, GetEmpty, 0}, NextStats, Ops0,
#state{table = channel_queue_metrics,
Expand Down Expand Up @@ -348,6 +360,10 @@ aggregate_entry({{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
end,
rabbit_core_metrics:delete(channel_queue_metrics, Id),
{NextStats, Ops2, State};
aggregate_entry({_QE, _Publish, _ToDelete}, NextStats, Ops0,
#state{table = queue_exchange_metrics} = State) ->
% Do nothing since it's already done by the channel aggregate entry below
{NextStats, Ops0, State};
aggregate_entry({{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, Ops0,
#state{table = channel_queue_exchange_metrics,
policies = {BPolicies, _, _},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,18 @@
{2, undefined, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out},
{2, undefined, queue_head_message_timestamp, gauge, "Timestamp of the first message in the queue, if any", head_message_timestamp},
{2, undefined, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads},
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}
{2, undefined, queue_disk_writes_total, counter, "Total number of times
queue wrote messages to disk", disk_writes}
]},
{queue_counter_metrics, [
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
{3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
]},

%%% Metrics that contain reference to a channel. Some of them also have
%%% a queue name, but in this case filtering on it doesn't make any
%%% sense, as the queue is not an object of interest here.
Expand All @@ -176,6 +185,13 @@
{2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
]},

{exchange_metrics, [
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
]},

{channel_exchange_metrics, [
{2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
Expand Down Expand Up @@ -210,6 +226,10 @@
{2, undefined, connection_channels, gauge, "Channels on a connection", channels}
]},

{queue_exchange_metrics, [
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"}
]},

{channel_queue_exchange_metrics, [
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
]}
Expand Down Expand Up @@ -562,22 +582,36 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
{disk_reads, A15}, {disk_writes, A16}]}];
get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == queue_counter_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == exchange_metrics;
Table == queue_exchange_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
Result = ets:foldl(fun
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
end;
({#resource{kind = queue, name = Name}, V1, V2, V3, V4, V5, V6, V7, _}, {T, A1, A2, A3, A4, A5, A6, A7} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4,
V5 + A5, V6 + A6, V7 + A7}
end;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
Expand Down Expand Up @@ -605,6 +639,50 @@ get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchang
_ ->
[Result]
end;
get_data(exchange_metrics = Table, true, VHostsFilter, _QueuesFilter) ->
ets:foldl(fun
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_counter_metrics = Table, true, VHostsFilter, QueuesFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost, name = QueueName}, _, _, _, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
IgnoreQueue = is_list(QueuesFilter)
andalso re:run(QueueName, QueuesFilter, [{capture, none}]),
case IgnoreQueue of
true ->
Acc;
false ->
[Row | Acc]
end;
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_exchange_metrics = Table, true, VHostsFilter, QueuesFilter) ->
ets:foldl(fun
({{
#resource{kind = queue, virtual_host = VHost, name = QueueName},
#resource{kind = exchange, virtual_host = VHost}
}, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
IgnoreQueue = is_list(QueuesFilter)
andalso re:run(QueueName, QueuesFilter, [{capture, none}]),
case IgnoreQueue of
true ->
Acc;
false ->
[Row | Acc]
end;
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
Expand Down Expand Up @@ -696,15 +774,15 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.

empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
empty(T) when T == ra_metrics ->
{T, 0, 0, 0, 0, 0, {0, 0}};
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
Expand Down
Loading

0 comments on commit a9ae940

Please sign in to comment.