Skip to content

Commit

Permalink
fix: Remove queues filter for prometh. collector
Browse files Browse the repository at this point in the history
Remove queue filters for prometheus_rabbitmq_core_metrics_collector

Closes rabbitmq#11315
  • Loading branch information
LoisSotoLopez committed Jun 13, 2024
1 parent 0875169 commit 27a0b0f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ end}.
{mapping, "prometheus.ssl.max_keepalive", "rabbitmq_prometheus.ssl_config.cowboy_opts.max_keepalive",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.

{mapping, "prometheus.filter_aggregated_queue_metrics_pattern", "rabbitmq_prometheus.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}.

%% Authentication options ========================================================
{mapping, "prometheus.authentication.enabled", "rabbitmq_prometheus.authentication.enabled",
[{datatype, {enum, [true, false]}}]}.
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,26 @@ register() ->
deregister_cleanup(_) -> ok.

collect_mf('detailed', Callback) ->
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback),
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback),
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
emit_identity_info(Callback),
ok;
collect_mf('per-object', Callback) ->
collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok;
collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok.

collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) ->
collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
_ = [begin
Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter),
Data = get_data(Table, PerObjectMetrics, VHostsFilter),
mf(Callback, Prefix, Contents, Data)
end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)],
ok.
Expand Down Expand Up @@ -492,7 +492,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
gauge_metric(Labels, Value)
end.

get_data(connection_metrics = Table, false, _, _) ->
get_data(connection_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(recv_cnt, Props), A1),
Expand All @@ -501,7 +501,7 @@ get_data(connection_metrics = Table, false, _, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
get_data(channel_metrics = Table, false, _, _) ->
get_data(channel_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
{T,
Expand All @@ -516,40 +516,22 @@ get_data(channel_metrics = Table, false, _, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end;
({_, Props, _}, {T, A1}) ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end, empty(MF), Table),
[{Table, [{consumers, A1}]}];
get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
get_data(queue_metrics = Table, false, VHostsFilter) ->
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17} =
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, Props, _}, Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
sum_queue_metrics(Props, Acc)
end;
({_, Props, _}, Acc) ->
sum_queue_metrics(Props, Acc)
end, empty(Table), Table),
Expand All @@ -560,7 +542,7 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics;
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Expand All @@ -571,14 +553,6 @@ get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchang
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
end;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
Expand All @@ -605,31 +579,31 @@ get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchang
_ ->
[Result]
end;
get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) ->
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
Table = queue_metrics,
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
get_data(queue_consumer_count, true, _, _) ->
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(vhost_status, _, _, _) ->
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true -> 1;
false -> 0
end}
|| VHost <- rabbit_vhost:list() ];
get_data(exchange_bindings, _, _, _) ->
get_data(exchange_bindings, _, _) ->
Exchanges = lists:foldl(fun
(#exchange{internal = true}, Acc) ->
Acc;
Expand All @@ -653,7 +627,7 @@ get_data(exchange_bindings, _, _, _) ->
[{<<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", Type/binary, "\"">>,
Bindings}|Acc]
end, [], WithCount);
get_data(exchange_names, _, _, _) ->
get_data(exchange_names, _, _) ->
lists:foldl(fun
(#exchange{internal = true}, Acc) ->
Acc;
Expand All @@ -663,7 +637,7 @@ get_data(exchange_names, _, _, _) ->
Label = <<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", (atom_to_binary(EType))/binary, "\"">>,
[{Label, 1}|Acc]
end, [], rabbit_exchange:list());
get_data(Table, _, _, _) ->
get_data(Table, _, _) ->
ets:tab2list(Table).


Expand Down Expand Up @@ -737,10 +711,3 @@ vhosts_filter_from_pdict() ->
maps:merge(All, Enabled)
end.

queues_filter_from_pdict() ->
case get(prometheus_queue_filter) of
undefined ->
false;
Pattern ->
Pattern
end.
6 changes: 0 additions & 6 deletions deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ put_filtering_options_into_process_dictionary(Request) ->
put(prometheus_mf_filter, Fs);
_ -> ok
end,
case application:get_env(rabbitmq_prometheus, filter_aggregated_queue_metrics_pattern, undefined) of
undefined -> ok;
Pattern ->
{ok, CompiledPattern} = re:compile(Pattern),
put(prometheus_queue_filter, CompiledPattern)
end,
ok.

parse_vhosts(N) when is_binary(N) ->
Expand Down

0 comments on commit 27a0b0f

Please sign in to comment.