From c453bdcac8270816ae8249d994d59c0f8005b264 Mon Sep 17 00:00:00 2001 From: Andrew Varner Date: Sun, 22 May 2022 20:18:35 -0500 Subject: [PATCH] Collect metrics on webhook calls. --- .../{src => include}/vmq_metrics.hrl | 0 apps/vmq_server/priv/vmq_server.schema | 2 +- apps/vmq_webhooks/changelog.md | 1 + apps/vmq_webhooks/priv/vmq_webhooks.schema | 2 +- .../vmq_webhooks/src/vmq_webhooks_metrics.erl | 265 ++++++++++++++++++ apps/vmq_webhooks/src/vmq_webhooks_plugin.erl | 5 + apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl | 27 +- changelog.md | 1 + 8 files changed, 300 insertions(+), 3 deletions(-) rename apps/vmq_server/{src => include}/vmq_metrics.hrl (100%) create mode 100644 apps/vmq_webhooks/src/vmq_webhooks_metrics.erl diff --git a/apps/vmq_server/src/vmq_metrics.hrl b/apps/vmq_server/include/vmq_metrics.hrl similarity index 100% rename from apps/vmq_server/src/vmq_metrics.hrl rename to apps/vmq_server/include/vmq_metrics.hrl diff --git a/apps/vmq_server/priv/vmq_server.schema b/apps/vmq_server/priv/vmq_server.schema index 5aae1ea64..b4159f905 100644 --- a/apps/vmq_server/priv/vmq_server.schema +++ b/apps/vmq_server/priv/vmq_server.schema @@ -1614,7 +1614,7 @@ {datatype, string} ]}. -%% @doc genenerate extra metrics from metrics with labels. +%% @doc generate extra metrics from metrics with labels. {mapping, "graphite_include_labels", "vmq_server.graphite_include_labels", [ {default, off}, {datatype, flag}, diff --git a/apps/vmq_webhooks/changelog.md b/apps/vmq_webhooks/changelog.md index e1cd3a6ef..e4f8d9082 100644 --- a/apps/vmq_webhooks/changelog.md +++ b/apps/vmq_webhooks/changelog.md @@ -1,5 +1,6 @@ # Changelog + - Collect per webhook type (e.g. `on_publish_m5_requests`) metrics. - Move persistence of webhooks to the `vernemq.conf` main file. This means adding hooks using the `vmq-admin` tool no longer persists the webhooks and they have to be manually added to the `vernemq.conf` file. diff --git a/apps/vmq_webhooks/priv/vmq_webhooks.schema b/apps/vmq_webhooks/priv/vmq_webhooks.schema index 9ab9c429d..be22f62db 100644 --- a/apps/vmq_webhooks/priv/vmq_webhooks.schema +++ b/apps/vmq_webhooks/priv/vmq_webhooks.schema @@ -6,7 +6,7 @@ %% a name. vmq_webhooks..hook = associates the hook %% with the name . Webhooks are registered in the order %% of the name given to it. Therefore a webhook with name 'webhook1' -%% is regisered before a webhook with the name 'webhook2'. +%% is registered before a webhook with the name 'webhook2'. {mapping, "vmq_webhooks.$name.hook", "vmq_webhooks.user_webhooks", [ {datatype, {enum, diff --git a/apps/vmq_webhooks/src/vmq_webhooks_metrics.erl b/apps/vmq_webhooks/src/vmq_webhooks_metrics.erl new file mode 100644 index 000000000..214656908 --- /dev/null +++ b/apps/vmq_webhooks/src/vmq_webhooks_metrics.erl @@ -0,0 +1,265 @@ +-module(vmq_webhooks_metrics). + +-export([init/0, incr/2, incr/3, metrics/0]). + +-include_lib("vmq_server/include/vmq_metrics.hrl"). + +-type counter_type() :: requests | errors | bytes_sent. +-type hook() :: + on_register | + on_register_m5 | + auth_on_publish | + auth_on_publish_m5 | + auth_on_register | + auth_on_register_m5 | + auth_on_subscribe | + auth_on_subscribe_m5 | + on_auth_m5 | + on_deliver | + on_deliver_m5 | + on_unsubscribe | + on_unsubscribe_m5 | + on_publish | + on_publish_m5 | + on_subscribe | + on_subscribe_m5 | + on_message_drop | + on_topic_unsubscribed | + on_session_expired | + on_offline_message | + on_config_change | + on_client_wakeup | + on_client_offline | + on_client_gone. + +-spec metrics() -> [{Type, Labels, Id, Description, Name, Value}] + when Type :: atom(), + Labels :: [metric_label()], + Id :: metric_id(), + Description :: undefined | binary(), + Name :: binary(), + Value :: term(). +metrics() -> + MetricKeys = [{H, CT} || H <- all_hooks(), CT <- all_counter_types()], + CounterRef = persistent_term:get(?MODULE), + [hook_and_counter_type_to_metric(CounterRef, Hook, CounterType) + || {Hook, CounterType} <- MetricKeys]. + +hook_and_counter_type_to_metric(CounterRef, Hook, CounterType) -> + Index = met2idx(Hook, CounterType), + HookBin = atom_to_binary(Hook), + CounterTypeBin = atom_to_binary(CounterType), + Name = <<"webhooks_", HookBin/binary, "_", CounterTypeBin/binary>>, + Id = binary_to_atom(Name), + Description = + case CounterType of + requests -> + <<"Number of requests to ", HookBin/binary, " webhooks">>; + errors -> + <<"Number of errors calling ", HookBin/binary, " webhooks">>; + bytes_sent -> + <<"Number of bytes sent to ", HookBin/binary, " webhooks">> + end, + Value = counters:get(CounterRef, Index), + {counter, [], Id, Description, Name, Value}. + +-spec init() -> ok. +init() -> + NumCounters = length(all_hooks()) * length(all_counter_types()), + CounterRef = counters:new(NumCounters, [write_concurrency]), + persistent_term:put(?MODULE, CounterRef), + application:set_env(vmq_webhooks, vmq_metrics_mfa, {?MODULE, metrics, []}). + +-spec incr(hook(), counter_type()) -> ok. +incr(Hook, CounterType) -> + incr(Hook, CounterType, 1). + +-spec incr(hook(), counter_type(), non_neg_integer()) -> ok. +incr(Hook, CounterType, Value) -> + CounterRef = persistent_term:get(?MODULE), + Index = met2idx(Hook, CounterType), + counters:add(CounterRef, Index, Value). + +-spec all_hooks() -> [hook()]. +all_hooks() -> + [on_register, + on_register_m5, + auth_on_publish, + auth_on_publish_m5, + auth_on_register, + auth_on_register_m5, + auth_on_subscribe, + auth_on_subscribe_m5, + on_auth_m5, + on_deliver, + on_deliver_m5, + on_unsubscribe, + on_unsubscribe_m5, + on_publish, + on_publish_m5, + on_subscribe, + on_subscribe_m5, + on_message_drop, + on_topic_unsubscribed, + on_session_expired, + on_offline_message, + on_config_change, + on_client_wakeup, + on_client_offline, + on_client_gone]. + +-spec all_counter_types() -> [counter_type()]. +all_counter_types() -> + [requests, errors, bytes_sent]. + +-spec met2idx(hook(), counter_type()) -> non_neg_integer(). +met2idx(on_register, requests) -> + 1; +met2idx(on_register_m5, requests) -> + 2; +met2idx(auth_on_publish, requests) -> + 3; +met2idx(auth_on_publish_m5, requests) -> + 4; +met2idx(auth_on_register, requests) -> + 5; +met2idx(auth_on_register_m5, requests) -> + 6; +met2idx(auth_on_subscribe, requests) -> + 7; +met2idx(auth_on_subscribe_m5, requests) -> + 8; +met2idx(on_auth_m5, requests) -> + 9; +met2idx(on_deliver, requests) -> + 10; +met2idx(on_deliver_m5, requests) -> + 11; +met2idx(on_unsubscribe, requests) -> + 12; +met2idx(on_unsubscribe_m5, requests) -> + 13; +met2idx(on_publish, requests) -> + 14; +met2idx(on_publish_m5, requests) -> + 15; +met2idx(on_subscribe, requests) -> + 16; +met2idx(on_subscribe_m5, requests) -> + 17; +met2idx(on_message_drop, requests) -> + 18; +met2idx(on_topic_unsubscribed, requests) -> + 19; +met2idx(on_session_expired, requests) -> + 20; +met2idx(on_offline_message, requests) -> + 21; +met2idx(on_config_change, requests) -> + 22; +met2idx(on_client_wakeup, requests) -> + 23; +met2idx(on_client_offline, requests) -> + 24; +met2idx(on_client_gone, requests) -> + 25; +met2idx(on_register, errors) -> + 26; +met2idx(on_register_m5, errors) -> + 27; +met2idx(auth_on_publish, errors) -> + 28; +met2idx(auth_on_publish_m5, errors) -> + 29; +met2idx(auth_on_register, errors) -> + 30; +met2idx(auth_on_register_m5, errors) -> + 31; +met2idx(auth_on_subscribe, errors) -> + 32; +met2idx(auth_on_subscribe_m5, errors) -> + 33; +met2idx(on_auth_m5, errors) -> + 34; +met2idx(on_deliver, errors) -> + 35; +met2idx(on_deliver_m5, errors) -> + 36; +met2idx(on_unsubscribe, errors) -> + 37; +met2idx(on_unsubscribe_m5, errors) -> + 38; +met2idx(on_publish, errors) -> + 39; +met2idx(on_publish_m5, errors) -> + 40; +met2idx(on_subscribe, errors) -> + 41; +met2idx(on_subscribe_m5, errors) -> + 42; +met2idx(on_message_drop, errors) -> + 43; +met2idx(on_topic_unsubscribed, errors) -> + 44; +met2idx(on_session_expired, errors) -> + 45; +met2idx(on_offline_message, errors) -> + 46; +met2idx(on_config_change, errors) -> + 47; +met2idx(on_client_wakeup, errors) -> + 48; +met2idx(on_client_offline, errors) -> + 49; +met2idx(on_client_gone, errors) -> + 50; +met2idx(on_register, bytes_sent) -> + 51; +met2idx(on_register_m5, bytes_sent) -> + 52; +met2idx(auth_on_publish, bytes_sent) -> + 53; +met2idx(auth_on_publish_m5, bytes_sent) -> + 54; +met2idx(auth_on_register, bytes_sent) -> + 55; +met2idx(auth_on_register_m5, bytes_sent) -> + 56; +met2idx(auth_on_subscribe, bytes_sent) -> + 57; +met2idx(auth_on_subscribe_m5, bytes_sent) -> + 58; +met2idx(on_auth_m5, bytes_sent) -> + 59; +met2idx(on_deliver, bytes_sent) -> + 60; +met2idx(on_deliver_m5, bytes_sent) -> + 61; +met2idx(on_unsubscribe, bytes_sent) -> + 62; +met2idx(on_unsubscribe_m5, bytes_sent) -> + 63; +met2idx(on_publish, bytes_sent) -> + 64; +met2idx(on_publish_m5, bytes_sent) -> + 65; +met2idx(on_subscribe, bytes_sent) -> + 66; +met2idx(on_subscribe_m5, bytes_sent) -> + 67; +met2idx(on_message_drop, bytes_sent) -> + 68; +met2idx(on_topic_unsubscribed, bytes_sent) -> + 69; +met2idx(on_session_expired, bytes_sent) -> + 70; +met2idx(on_offline_message, bytes_sent) -> + 71; +met2idx(on_config_change, bytes_sent) -> + 72; +met2idx(on_client_wakeup, bytes_sent) -> + 73; +met2idx(on_client_offline, bytes_sent) -> + 74; +met2idx(on_client_gone, bytes_sent) -> + 75. diff --git a/apps/vmq_webhooks/src/vmq_webhooks_plugin.erl b/apps/vmq_webhooks/src/vmq_webhooks_plugin.erl index 070b6db17..f2b1408b6 100644 --- a/apps/vmq_webhooks/src/vmq_webhooks_plugin.erl +++ b/apps/vmq_webhooks/src/vmq_webhooks_plugin.erl @@ -134,6 +134,7 @@ init([]) -> process_flag(trap_exit, true), ets:new(?TBL, [public, ordered_set, named_table, {read_concurrency, true}]), ok = vmq_webhooks_cache:new(), + vmq_webhooks_metrics:init(), {ok, #state{}}. %%-------------------------------------------------------------------- @@ -731,11 +732,15 @@ call_endpoint(Endpoint, EOpts, Hook, Args0) -> {error, _} = E -> E end, + vmq_webhooks_metrics:incr(Hook, requests), + vmq_webhooks_metrics:incr(Hook, bytes_sent, size(Payload)), case Res of {decoded_error, Reason} -> + vmq_webhooks_metrics:incr(Hook, errors), lager:debug("calling endpoint received error due to ~p", [Reason]), {error, Reason}; {error, Reason} -> + vmq_webhooks_metrics:incr(Hook, errors), lager:error("calling endpoint failed due to ~p", [Reason]), {error, Reason}; Res -> diff --git a/apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl b/apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl index e687d01c0..3e0fce0f5 100644 --- a/apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl +++ b/apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl @@ -1,6 +1,7 @@ -module(vmq_webhooks_SUITE). -include_lib("common_test/include/ct.hrl"). -include_lib("vernemq_dev/include/vernemq_dev.hrl"). +-include_lib("vmq_server/include/vmq_metrics.hrl"). -include("vmq_webhooks_test.hrl"). @@ -104,7 +105,8 @@ http() -> cache_auth_on_publish, cache_auth_on_subscribe, cache_expired_entry, - cli_allow_query_parameters_test + cli_allow_query_parameters_test, + metrics_test ]. https() -> @@ -540,6 +542,29 @@ cli_allow_query_parameters_test(_) -> ok = register_hook(auth_on_register, EndpointWithParams), [] = deregister_hook(auth_on_register, EndpointWithParams). +metrics_test(_) -> + register_hook(on_session_expired, ?ENDPOINT), + StartReqCntr = find_metric_value(webhooks_on_session_expired_requests), + StartErrCntr = find_metric_value(webhooks_on_session_expired_errors), + StartSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent), + Self = pid_to_bin(self()), + [next] = vmq_plugin:all(on_session_expired, [{?MOUNTPOINT, Self}]), + ok = exp_response(on_session_expired_ok), + EndReqCntr = find_metric_value(webhooks_on_session_expired_requests), + EndErrCntr = find_metric_value(webhooks_on_session_expired_errors), + EndSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent), + 1 = EndReqCntr - StartReqCntr, + 0 = EndErrCntr - StartErrCntr, + true = (EndSentBytesCntr - StartSentBytesCntr) > 0, + deregister_hook(on_session_expired, ?ENDPOINT). + +find_metric_value(Id) -> + Metrics = vmq_metrics:metrics(), + {value, {_MetricDef, Value}} = lists:search(fun({Metric, V}) -> + Metric#metric_def.id =:= Id + end, Metrics), + Value. + %% HTTPS Tests %% Given a CA that signed the endpoint's server certificate, the webhook works diff --git a/changelog.md b/changelog.md index 03812f1d7..665243e54 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,4 @@ +- Collect per webhook type (e.g. `on_publish_m5_requests`) metrics. - Fix vmqs (SSL) inter-node communication. - Store waiting pubrec packets in queue and add to waiting acks on reconnect. - Fix PostGres (`epgsql` 4.6.0) response format in `vmq_diversity`.