Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add web hook metrics #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/vmq_server/priv/vmq_server.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@
{datatype, string}
]}.

%% @doc genenerate extra metrics from metrics with labels.
%% @doc generate extra metrics from metrics with labels.
{mapping, "graphite_include_labels", "vmq_server.graphite_include_labels", [
{default, off},
{datatype, flag},
Expand Down
1 change: 1 addition & 0 deletions apps/vmq_webhooks/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Changelog

- Collect per webhook type (e.g. `on_publish_m5_requests`) metrics.
- Move persistence of webhooks to the `vernemq.conf` main file. This means
adding hooks using the `vmq-admin` tool no longer persists the webhooks and
they have to be manually added to the `vernemq.conf` file.
Expand Down
2 changes: 1 addition & 1 deletion apps/vmq_webhooks/priv/vmq_webhooks.schema
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
%% a name. vmq_webhooks.<name>.hook = <hook> associates the hook
%% <hook> with the name <name>. Webhooks are registered in the order
%% of the name given to it. Therefore a webhook with name 'webhook1'
%% is regisered before a webhook with the name 'webhook2'.
%% is registered before a webhook with the name 'webhook2'.
{mapping, "vmq_webhooks.$name.hook", "vmq_webhooks.user_webhooks",
[
{datatype, {enum,
Expand Down
265 changes: 265 additions & 0 deletions apps/vmq_webhooks/src/vmq_webhooks_metrics.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
-module(vmq_webhooks_metrics).

-export([init/0, incr/2, incr/3, metrics/0]).

-include_lib("vmq_server/include/vmq_metrics.hrl").

-type counter_type() :: requests | errors | bytes_sent.
-type hook() ::
on_register |
on_register_m5 |
auth_on_publish |
auth_on_publish_m5 |
auth_on_register |
auth_on_register_m5 |
auth_on_subscribe |
auth_on_subscribe_m5 |
on_auth_m5 |
on_deliver |
on_deliver_m5 |
on_unsubscribe |
on_unsubscribe_m5 |
on_publish |
on_publish_m5 |
on_subscribe |
on_subscribe_m5 |
on_message_drop |
on_topic_unsubscribed |
on_session_expired |
on_offline_message |
on_config_change |
on_client_wakeup |
on_client_offline |
on_client_gone.

-spec metrics() -> [{Type, Labels, Id, Description, Name, Value}]
when Type :: atom(),
Labels :: [metric_label()],
Id :: metric_id(),
Description :: undefined | binary(),
Name :: binary(),
Value :: term().
metrics() ->
MetricKeys = [{H, CT} || H <- all_hooks(), CT <- all_counter_types()],
CounterRef = persistent_term:get(?MODULE),
[hook_and_counter_type_to_metric(CounterRef, Hook, CounterType)
|| {Hook, CounterType} <- MetricKeys].

hook_and_counter_type_to_metric(CounterRef, Hook, CounterType) ->
Index = met2idx(Hook, CounterType),
HookBin = atom_to_binary(Hook),
CounterTypeBin = atom_to_binary(CounterType),
Name = <<"webhooks_", HookBin/binary, "_", CounterTypeBin/binary>>,
Id = binary_to_atom(Name),
Description =
case CounterType of
requests ->
<<"Number of requests to ", HookBin/binary, " webhooks">>;
errors ->
<<"Number of errors calling ", HookBin/binary, " webhooks">>;
bytes_sent ->
<<"Number of bytes sent to ", HookBin/binary, " webhooks">>
end,
Value = counters:get(CounterRef, Index),
{counter, [], Id, Description, Name, Value}.

-spec init() -> ok.
init() ->
NumCounters = length(all_hooks()) * length(all_counter_types()),
CounterRef = counters:new(NumCounters, [write_concurrency]),
persistent_term:put(?MODULE, CounterRef),
application:set_env(vmq_webhooks, vmq_metrics_mfa, {?MODULE, metrics, []}).

-spec incr(hook(), counter_type()) -> ok.
incr(Hook, CounterType) ->
incr(Hook, CounterType, 1).

-spec incr(hook(), counter_type(), non_neg_integer()) -> ok.
incr(Hook, CounterType, Value) ->
CounterRef = persistent_term:get(?MODULE),
Index = met2idx(Hook, CounterType),
counters:add(CounterRef, Index, Value).

-spec all_hooks() -> [hook()].
all_hooks() ->
[on_register,
on_register_m5,
auth_on_publish,
auth_on_publish_m5,
auth_on_register,
auth_on_register_m5,
auth_on_subscribe,
auth_on_subscribe_m5,
on_auth_m5,
on_deliver,
on_deliver_m5,
on_unsubscribe,
on_unsubscribe_m5,
on_publish,
on_publish_m5,
on_subscribe,
on_subscribe_m5,
on_message_drop,
on_topic_unsubscribed,
on_session_expired,
on_offline_message,
on_config_change,
on_client_wakeup,
on_client_offline,
on_client_gone].

-spec all_counter_types() -> [counter_type()].
all_counter_types() ->
[requests, errors, bytes_sent].

-spec met2idx(hook(), counter_type()) -> non_neg_integer().
met2idx(on_register, requests) ->
1;
met2idx(on_register_m5, requests) ->
2;
met2idx(auth_on_publish, requests) ->
3;
met2idx(auth_on_publish_m5, requests) ->
4;
met2idx(auth_on_register, requests) ->
5;
met2idx(auth_on_register_m5, requests) ->
6;
met2idx(auth_on_subscribe, requests) ->
7;
met2idx(auth_on_subscribe_m5, requests) ->
8;
met2idx(on_auth_m5, requests) ->
9;
met2idx(on_deliver, requests) ->
10;
met2idx(on_deliver_m5, requests) ->
11;
met2idx(on_unsubscribe, requests) ->
12;
met2idx(on_unsubscribe_m5, requests) ->
13;
met2idx(on_publish, requests) ->
14;
met2idx(on_publish_m5, requests) ->
15;
met2idx(on_subscribe, requests) ->
16;
met2idx(on_subscribe_m5, requests) ->
17;
met2idx(on_message_drop, requests) ->
18;
met2idx(on_topic_unsubscribed, requests) ->
19;
met2idx(on_session_expired, requests) ->
20;
met2idx(on_offline_message, requests) ->
21;
met2idx(on_config_change, requests) ->
22;
met2idx(on_client_wakeup, requests) ->
23;
met2idx(on_client_offline, requests) ->
24;
met2idx(on_client_gone, requests) ->
25;
met2idx(on_register, errors) ->
26;
met2idx(on_register_m5, errors) ->
27;
met2idx(auth_on_publish, errors) ->
28;
met2idx(auth_on_publish_m5, errors) ->
29;
met2idx(auth_on_register, errors) ->
30;
met2idx(auth_on_register_m5, errors) ->
31;
met2idx(auth_on_subscribe, errors) ->
32;
met2idx(auth_on_subscribe_m5, errors) ->
33;
met2idx(on_auth_m5, errors) ->
34;
met2idx(on_deliver, errors) ->
35;
met2idx(on_deliver_m5, errors) ->
36;
met2idx(on_unsubscribe, errors) ->
37;
met2idx(on_unsubscribe_m5, errors) ->
38;
met2idx(on_publish, errors) ->
39;
met2idx(on_publish_m5, errors) ->
40;
met2idx(on_subscribe, errors) ->
41;
met2idx(on_subscribe_m5, errors) ->
42;
met2idx(on_message_drop, errors) ->
43;
met2idx(on_topic_unsubscribed, errors) ->
44;
met2idx(on_session_expired, errors) ->
45;
met2idx(on_offline_message, errors) ->
46;
met2idx(on_config_change, errors) ->
47;
met2idx(on_client_wakeup, errors) ->
48;
met2idx(on_client_offline, errors) ->
49;
met2idx(on_client_gone, errors) ->
50;
met2idx(on_register, bytes_sent) ->
51;
met2idx(on_register_m5, bytes_sent) ->
52;
met2idx(auth_on_publish, bytes_sent) ->
53;
met2idx(auth_on_publish_m5, bytes_sent) ->
54;
met2idx(auth_on_register, bytes_sent) ->
55;
met2idx(auth_on_register_m5, bytes_sent) ->
56;
met2idx(auth_on_subscribe, bytes_sent) ->
57;
met2idx(auth_on_subscribe_m5, bytes_sent) ->
58;
met2idx(on_auth_m5, bytes_sent) ->
59;
met2idx(on_deliver, bytes_sent) ->
60;
met2idx(on_deliver_m5, bytes_sent) ->
61;
met2idx(on_unsubscribe, bytes_sent) ->
62;
met2idx(on_unsubscribe_m5, bytes_sent) ->
63;
met2idx(on_publish, bytes_sent) ->
64;
met2idx(on_publish_m5, bytes_sent) ->
65;
met2idx(on_subscribe, bytes_sent) ->
66;
met2idx(on_subscribe_m5, bytes_sent) ->
67;
met2idx(on_message_drop, bytes_sent) ->
68;
met2idx(on_topic_unsubscribed, bytes_sent) ->
69;
met2idx(on_session_expired, bytes_sent) ->
70;
met2idx(on_offline_message, bytes_sent) ->
71;
met2idx(on_config_change, bytes_sent) ->
72;
met2idx(on_client_wakeup, bytes_sent) ->
73;
met2idx(on_client_offline, bytes_sent) ->
74;
met2idx(on_client_gone, bytes_sent) ->
75.
5 changes: 5 additions & 0 deletions apps/vmq_webhooks/src/vmq_webhooks_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ init([]) ->
process_flag(trap_exit, true),
ets:new(?TBL, [public, ordered_set, named_table, {read_concurrency, true}]),
ok = vmq_webhooks_cache:new(),
vmq_webhooks_metrics:init(),
{ok, #state{}}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -731,11 +732,15 @@ call_endpoint(Endpoint, EOpts, Hook, Args0) ->
{error, _} = E ->
E
end,
vmq_webhooks_metrics:incr(Hook, requests),
vmq_webhooks_metrics:incr(Hook, bytes_sent, size(Payload)),
case Res of
{decoded_error, Reason} ->
vmq_webhooks_metrics:incr(Hook, errors),
lager:debug("calling endpoint received error due to ~p", [Reason]),
{error, Reason};
{error, Reason} ->
vmq_webhooks_metrics:incr(Hook, errors),
lager:error("calling endpoint failed due to ~p", [Reason]),
{error, Reason};
Res ->
Expand Down
27 changes: 26 additions & 1 deletion apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(vmq_webhooks_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("vernemq_dev/include/vernemq_dev.hrl").
-include_lib("vmq_server/include/vmq_metrics.hrl").
-include("vmq_webhooks_test.hrl").


Expand Down Expand Up @@ -104,7 +105,8 @@ http() ->
cache_auth_on_publish,
cache_auth_on_subscribe,
cache_expired_entry,
cli_allow_query_parameters_test
cli_allow_query_parameters_test,
metrics_test
].

https() ->
Expand Down Expand Up @@ -540,6 +542,29 @@ cli_allow_query_parameters_test(_) ->
ok = register_hook(auth_on_register, EndpointWithParams),
[] = deregister_hook(auth_on_register, EndpointWithParams).

metrics_test(_) ->
register_hook(on_session_expired, ?ENDPOINT),
StartReqCntr = find_metric_value(webhooks_on_session_expired_requests),
StartErrCntr = find_metric_value(webhooks_on_session_expired_errors),
StartSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent),
Self = pid_to_bin(self()),
[next] = vmq_plugin:all(on_session_expired, [{?MOUNTPOINT, Self}]),
ok = exp_response(on_session_expired_ok),
EndReqCntr = find_metric_value(webhooks_on_session_expired_requests),
EndErrCntr = find_metric_value(webhooks_on_session_expired_errors),
EndSentBytesCntr = find_metric_value(webhooks_on_session_expired_bytes_sent),
1 = EndReqCntr - StartReqCntr,
0 = EndErrCntr - StartErrCntr,
true = (EndSentBytesCntr - StartSentBytesCntr) > 0,
deregister_hook(on_session_expired, ?ENDPOINT).

find_metric_value(Id) ->
Metrics = vmq_metrics:metrics(),
{value, {_MetricDef, Value}} = lists:search(fun({Metric, V}) ->
Metric#metric_def.id =:= Id
end, Metrics),
Value.

%% HTTPS Tests

%% Given a CA that signed the endpoint's server certificate, the webhook works
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Collect per webhook type (e.g. `on_publish_m5_requests`) metrics.
- Fix vmqs (SSL) inter-node communication.
- Store waiting pubrec packets in queue and add to waiting acks on reconnect.
- Fix PostGres (`epgsql` 4.6.0) response format in `vmq_diversity`.
Expand Down