Skip to content

Commit

Permalink
Collect metrics on webhook calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
varnerac committed Jun 13, 2022
1 parent d6644ae commit c453bdc
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 3 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion apps/vmq_server/priv/vmq_server.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@
{datatype, string}
]}.

%% @doc genenerate extra metrics from metrics with labels.
%% @doc generate extra metrics from metrics with labels.
{mapping, "graphite_include_labels", "vmq_server.graphite_include_labels", [
{default, off},
{datatype, flag},
Expand Down
1 change: 1 addition & 0 deletions apps/vmq_webhooks/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Changelog

- Collect per webhook type (e.g. `on_publish_m5_requests`) metrics.
- Move persistence of webhooks to the `vernemq.conf` main file. This means
adding hooks using the `vmq-admin` tool no longer persists the webhooks and
they have to be manually added to the `vernemq.conf` file.
Expand Down
2 changes: 1 addition & 1 deletion apps/vmq_webhooks/priv/vmq_webhooks.schema
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
%% a name. vmq_webhooks.<name>.hook = <hook> associates the hook
%% <hook> with the name <name>. Webhooks are registered in the order
%% of the name given to it. Therefore a webhook with name 'webhook1'
%% is regisered before a webhook with the name 'webhook2'.
%% is registered before a webhook with the name 'webhook2'.
{mapping, "vmq_webhooks.$name.hook", "vmq_webhooks.user_webhooks",
[
{datatype, {enum,
Expand Down
265 changes: 265 additions & 0 deletions apps/vmq_webhooks/src/vmq_webhooks_metrics.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
-module(vmq_webhooks_metrics).

-export([init/0, incr/2, incr/3, metrics/0]).

-include_lib("vmq_server/include/vmq_metrics.hrl").

-type counter_type() :: requests | errors | bytes_sent.
-type hook() ::
on_register |
on_register_m5 |
auth_on_publish |
auth_on_publish_m5 |
auth_on_register |
auth_on_register_m5 |
auth_on_subscribe |
auth_on_subscribe_m5 |
on_auth_m5 |
on_deliver |
on_deliver_m5 |
on_unsubscribe |
on_unsubscribe_m5 |
on_publish |
on_publish_m5 |
on_subscribe |
on_subscribe_m5 |
on_message_drop |
on_topic_unsubscribed |
on_session_expired |
on_offline_message |
on_config_change |
on_client_wakeup |
on_client_offline |
on_client_gone.

-spec metrics() -> [{Type, Labels, Id, Description, Name, Value}]
when Type :: atom(),
Labels :: [metric_label()],
Id :: metric_id(),
Description :: undefined | binary(),
Name :: binary(),
Value :: term().
metrics() ->
MetricKeys = [{H, CT} || H <- all_hooks(), CT <- all_counter_types()],
CounterRef = persistent_term:get(?MODULE),
[hook_and_counter_type_to_metric(CounterRef, Hook, CounterType)
|| {Hook, CounterType} <- MetricKeys].

hook_and_counter_type_to_metric(CounterRef, Hook, CounterType) ->
Index = met2idx(Hook, CounterType),
HookBin = atom_to_binary(Hook),
CounterTypeBin = atom_to_binary(CounterType),
Name = <<"webhooks_", HookBin/binary, "_", CounterTypeBin/binary>>,
Id = binary_to_atom(Name),
Description =
case CounterType of
requests ->
<<"Number of requests to ", HookBin/binary, " webhooks">>;
errors ->
<<"Number of errors calling ", HookBin/binary, " webhooks">>;
bytes_sent ->
<<"Number of bytes sent to ", HookBin/binary, " webhooks">>
end,
Value = counters:get(CounterRef, Index),
{counter, [], Id, Description, Name, Value}.

-spec init() -> ok.
init() ->
NumCounters = length(all_hooks()) * length(all_counter_types()),
CounterRef = counters:new(NumCounters, [write_concurrency]),
persistent_term:put(?MODULE, CounterRef),
application:set_env(vmq_webhooks, vmq_metrics_mfa, {?MODULE, metrics, []}).

-spec incr(hook(), counter_type()) -> ok.
incr(Hook, CounterType) ->
incr(Hook, CounterType, 1).

-spec incr(hook(), counter_type(), non_neg_integer()) -> ok.
incr(Hook, CounterType, Value) ->
CounterRef = persistent_term:get(?MODULE),
Index = met2idx(Hook, CounterType),
counters:add(CounterRef, Index, Value).

-spec all_hooks() -> [hook()].
all_hooks() ->
[on_register,
on_register_m5,
auth_on_publish,
auth_on_publish_m5,
auth_on_register,
auth_on_register_m5,
auth_on_subscribe,
auth_on_subscribe_m5,
on_auth_m5,
on_deliver,
on_deliver_m5,
on_unsubscribe,
on_unsubscribe_m5,
on_publish,
on_publish_m5,
on_subscribe,
on_subscribe_m5,
on_message_drop,
on_topic_unsubscribed,
on_session_expired,
on_offline_message,
on_config_change,
on_client_wakeup,
on_client_offline,
on_client_gone].

-spec all_counter_types() -> [counter_type()].
all_counter_types() ->
[requests, errors, bytes_sent].

-spec met2idx(hook(), counter_type()) -> non_neg_integer().
met2idx(on_register, requests) ->
1;
met2idx(on_register_m5, requests) ->
2;
met2idx(auth_on_publish, requests) ->
3;
met2idx(auth_on_publish_m5, requests) ->
4;
met2idx(auth_on_register, requests) ->
5;
met2idx(auth_on_register_m5, requests) ->
6;
met2idx(auth_on_subscribe, requests) ->
7;
met2idx(auth_on_subscribe_m5, requests) ->
8;
met2idx(on_auth_m5, requests) ->
9;
met2idx(on_deliver, requests) ->
10;
met2idx(on_deliver_m5, requests) ->
11;
met2idx(on_unsubscribe, requests) ->
12;
met2idx(on_unsubscribe_m5, requests) ->
13;
met2idx(on_publish, requests) ->
14;
met2idx(on_publish_m5, requests) ->
15;
met2idx(on_subscribe, requests) ->
16;
met2idx(on_subscribe_m5, requests) ->
17;
met2idx(on_message_drop, requests) ->
18;
met2idx(on_topic_unsubscribed, requests) ->
19;
met2idx(on_session_expired, requests) ->
20;
met2idx(on_offline_message, requests) ->
21;
met2idx(on_config_change, requests) ->
22;
met2idx(on_client_wakeup, requests) ->
23;
met2idx(on_client_offline, requests) ->
24;
met2idx(on_client_gone, requests) ->
25;
met2idx(on_register, errors) ->
26;
met2idx(on_register_m5, errors) ->
27;
met2idx(auth_on_publish, errors) ->
28;
met2idx(auth_on_publish_m5, errors) ->
29;
met2idx(auth_on_register, errors) ->
30;
met2idx(auth_on_register_m5, errors) ->
31;
met2idx(auth_on_subscribe, errors) ->
32;
met2idx(auth_on_subscribe_m5, errors) ->
33;
met2idx(on_auth_m5, errors) ->
34;
met2idx(on_deliver, errors) ->
35;
met2idx(on_deliver_m5, errors) ->
36;
met2idx(on_unsubscribe, errors) ->
37;
met2idx(on_unsubscribe_m5, errors) ->
38;
met2idx(on_publish, errors) ->
39;
met2idx(on_publish_m5, errors) ->
40;
met2idx(on_subscribe, errors) ->
41;
met2idx(on_subscribe_m5, errors) ->
42;
met2idx(on_message_drop, errors) ->
43;
met2idx(on_topic_unsubscribed, errors) ->
44;
met2idx(on_session_expired, errors) ->
45;
met2idx(on_offline_message, errors) ->
46;
met2idx(on_config_change, errors) ->
47;
met2idx(on_client_wakeup, errors) ->
48;
met2idx(on_client_offline, errors) ->
49;
met2idx(on_client_gone, errors) ->
50;
met2idx(on_register, bytes_sent) ->
51;
met2idx(on_register_m5, bytes_sent) ->
52;
met2idx(auth_on_publish, bytes_sent) ->
53;
met2idx(auth_on_publish_m5, bytes_sent) ->
54;
met2idx(auth_on_register, bytes_sent) ->
55;
met2idx(auth_on_register_m5, bytes_sent) ->
56;
met2idx(auth_on_subscribe, bytes_sent) ->
57;
met2idx(auth_on_subscribe_m5, bytes_sent) ->
58;
met2idx(on_auth_m5, bytes_sent) ->
59;
met2idx(on_deliver, bytes_sent) ->
60;
met2idx(on_deliver_m5, bytes_sent) ->
61;
met2idx(on_unsubscribe, bytes_sent) ->
62;
met2idx(on_unsubscribe_m5, bytes_sent) ->
63;
met2idx(on_publish, bytes_sent) ->
64;
met2idx(on_publish_m5, bytes_sent) ->
65;
met2idx(on_subscribe, bytes_sent) ->
66;
met2idx(on_subscribe_m5, bytes_sent) ->
67;
met2idx(on_message_drop, bytes_sent) ->
68;
met2idx(on_topic_unsubscribed, bytes_sent) ->
69;
met2idx(on_session_expired, bytes_sent) ->
70;
met2idx(on_offline_message, bytes_sent) ->
71;
met2idx(on_config_change, bytes_sent) ->
72;
met2idx(on_client_wakeup, bytes_sent) ->
73;
met2idx(on_client_offline, bytes_sent) ->
74;
met2idx(on_client_gone, bytes_sent) ->
75.
5 changes: 5 additions & 0 deletions apps/vmq_webhooks/src/vmq_webhooks_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ init([]) ->
process_flag(trap_exit, true),
ets:new(?TBL, [public, ordered_set, named_table, {read_concurrency, true}]),
ok = vmq_webhooks_cache:new(),
vmq_webhooks_metrics:init(),
{ok, #state{}}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -731,11 +732,15 @@ call_endpoint(Endpoint, EOpts, Hook, Args0) ->
{error, _} = E ->
E
end,
vmq_webhooks_metrics:incr(Hook, requests),
vmq_webhooks_metrics:incr(Hook, bytes_sent, size(Payload)),
case Res of
{decoded_error, Reason} ->
vmq_webhooks_metrics:incr(Hook, errors),
lager:debug("calling endpoint received error due to ~p", [Reason]),
{error, Reason};
{error, Reason} ->
vmq_webhooks_metrics:incr(Hook, errors),
lager:error("calling endpoint failed due to ~p", [Reason]),
{error, Reason};
Res ->
Expand Down
27 changes: 26 additions & 1 deletion apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(vmq_webhooks_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("vernemq_dev/include/vernemq_dev.hrl").
-include_lib("vmq_server/include/vmq_metrics.hrl").
-include("vmq_webhooks_test.hrl").


Expand Down Expand Up @@ -104,7 +105,8 @@ http() ->
cache_auth_on_publish,
cache_auth_on_subscribe,
cache_expired_entry,
cli_allow_query_parameters_test
cli_allow_query_parameters_test,
metrics_test
].

https() ->
Expand Down Expand Up @@ -540,6 +542,29 @@ cli_allow_query_parameters_test(_) ->
ok = register_hook(auth_on_register, EndpointWithParams),
[] = deregister_hook(auth_on_register, EndpointWithParams).

metrics_test(_) ->
register_hook(on_session_expired, ?ENDPOINT),
StartReqCntr = find_metric_value(webhooks_on_session_expired_requests),
StartErrCntr = find_metric_value(webhooks_on_session_expired_errors),
StartSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent),
Self = pid_to_bin(self()),
[next] = vmq_plugin:all(on_session_expired, [{?MOUNTPOINT, Self}]),
ok = exp_response(on_session_expired_ok),
EndReqCntr = find_metric_value(webhooks_on_session_expired_requests),
EndErrCntr = find_metric_value(webhooks_on_session_expired_errors),
EndSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent),
1 = EndReqCntr - StartReqCntr,
0 = EndErrCntr - StartErrCntr,
true = (EndSentBytesCntr - StartSentBytesCntr) > 0,
deregister_hook(on_session_expired, ?ENDPOINT).

find_metric_value(Id) ->
Metrics = vmq_metrics:metrics(),
{value, {_MetricDef, Value}} = lists:search(fun({Metric, V}) ->
Metric#metric_def.id =:= Id
end, Metrics),
Value.

%% HTTPS Tests

%% Given a CA that signed the endpoint's server certificate, the webhook works
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Collect per webhook type (e.g. `on_publish_m5_requests`) metrics.
- Fix vmqs (SSL) inter-node communication.
- Store waiting pubrec packets in queue and add to waiting acks on reconnect.
- Fix PostGres (`epgsql` 4.6.0) response format in `vmq_diversity`.
Expand Down

0 comments on commit c453bdc

Please sign in to comment.