Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing test for prometheus pr #2

Open
wants to merge 6 commits into
base: fix/11315-remove-filter-aggregated-queue-metrics-pattern
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions deps/rabbit/src/rabbit_core_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ gc_leader_data(Id, Table, GbSet) ->
gc_global_queues() ->
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
gc_process_and_entity(channel_queue_metrics, GbSet),
gc_entity(queue_counter_metrics, GbSet),
gc_process_and_entity(consumer_created, GbSet),
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet).
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_metrics, GbSet, ExchangeGbSet).

gc_exchanges() ->
Exchanges = rabbit_exchange:list_names(),
GbSet = gb_sets:from_list(Exchanges),
gc_process_and_entity(channel_exchange_metrics, GbSet).
gc_process_and_entity(channel_exchange_metrics, GbSet),
gc_entity(exchange_metrics, GbSet).

gc_nodes() ->
Nodes = rabbit_nodes:list_members(),
Expand Down Expand Up @@ -153,6 +156,12 @@ gc_entity(Table, GbSet) ->
({Id = Key, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _}, none)
when Table == exchange_metrics ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _, _, _, _}, none)
when Table == queue_counter_metrics ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).

Expand Down Expand Up @@ -188,6 +197,13 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
none
end.

gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
ets:foldl(fun({{QueueId, ExchangeId} = Key, _, _}, none)
when Table == queue_exchange_metrics ->
gc_entity(QueueId, Table, Key, QueueGbSet),
gc_entity(ExchangeId, Table, Key, ExchangeGbSet)
end, none, Table).

gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
gc_process(Pid, Table, Key),
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
{auth_attempt_metrics, set},
{auth_attempt_detailed_metrics, set}]).

% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
% channel_queue_exchange_metrics but without including the channel ID in the
% key.
-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set},
{exchange_metrics, set},
{queue_exchange_metrics, set}]).

-define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}).

%% connection_created :: {connection_id, proplist}
Expand Down
44 changes: 29 additions & 15 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ create_table({Table, Type}) ->
{read_concurrency, true}]).

init() ->
_ = [create_table({Table, Type})
|| {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
_ = [create_table({Table, Type})
|| {Table, Type} <- Tables],
ok.

terminate() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
[ets:delete(Table)
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
|| {Table, _Type} <- Tables],
ok.

connection_created(Pid, Infos) ->
Expand Down Expand Up @@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.

channel_stats(exchange_stats, publish, Id, Value) ->
channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, Id, Value) ->
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_exchange_stats, publish, Id, Value) ->
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
ok;
channel_stats(queue_stats, get, Id, Value) ->
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_no_ack, Id, Value) ->
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver, Id, Value) ->
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, redeliver, Id, Value) ->
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, ack, Id, Value) ->
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_empty, Id, Value) ->
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok.

delete(Table, Key) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
{2, undefined, stream_segments, counter, "Total number of stream segment files", segments}
]},

%%% Metrics that contain reference to a channel. Some of them also have
%%% a queue name, but in this case filtering on it doesn't make any
%%% sense, as the queue is not an object of interest here.
Expand Down Expand Up @@ -211,9 +210,26 @@
]},

{channel_queue_exchange_metrics, [
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
]}
]).
{2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through a exchange on a channel"}
]},
{queue_exchange_metrics, [
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"}
]},
{exchange_metrics, [
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange"},
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed"},
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
]},
{queue_counter_metrics, [
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched from a queue with basic.get in manual acknowledgement mode"},
{3, undefined, queue_get_total, counter, "Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode"},
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered from a queue to consumers in manual acknowledgement mode"},
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered from a queue to consumers in automatic acknowledgement mode"},
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered from a queue to consumers"},
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers on a queue"},
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"}
]}]).

%% Metrics that can be only requested through `/metrics/detailed`
-define(METRICS_CLUSTER,[
Expand Down Expand Up @@ -544,15 +560,22 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == queue_counter_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == exchange_metrics;
Table == queue_exchange_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
Result = ets:foldl(fun
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({{#resource{kind = queue, virtual_host = VHost}, #resource{kind = exchange}}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
Expand All @@ -579,6 +602,36 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
ets:foldl(fun
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({{
#resource{kind = queue, virtual_host = VHost},
#resource{kind = exchange, virtual_host = VHost}
}, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
Expand Down Expand Up @@ -671,15 +724,15 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.

empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
empty(T) when T == ra_metrics ->
{T, 0, 0, 0, 0, 0, {0, 0}};
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
Expand Down
Loading
Loading