Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Commit

Permalink
Making it possible to push metrics in chunks rather than one at a time
Browse files Browse the repository at this point in the history
When using `dogstatde` to push a _lot_ of custom Datadog metrics, we've
observed that the overhead of checking workers out & back in of the
`dogstatsd_worker` `wpool` can become a performance bottleneck.

This patch makes it possible to submit several metrics of the same type
at one time, hence allowing for a single worker check-out/in cycle to submit
them all, for example:
```
dogstatsd:gauge([{"users", UserTypeCount, #{ user_type => UserType }}
                 || {UserTypeCount, UserType} <- UserCounts]).
```

We've deployed that in production, and found a noticeable difference in our
use case.

This patch should be entirely backward-compatible.

Updated tests, and added a few more.
  • Loading branch information
wk8 authored and JoshRagem committed Jan 13, 2017
1 parent 8fd1755 commit 1d83ac6
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 29 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ or in an Erlang config file in all_lowercase.
dogstatsd:gauge("users.active", UserCount, #{ shard => ShardId, version => Vsn })
```

6. When pushing a lot of custom metrics, it can be beneficial to push them in chunks for efficiency, for example:
```erlang
dogstatsd:gauge([{"users", UserTypeCount, #{ user_type => UserType }}
|| {UserTypeCount, UserType} <- UserCounts]).
```

### Elixir

For more details, see the example application in (examples/elixir)[examples/elixir]
Expand Down
77 changes: 60 additions & 17 deletions src/dogstatsd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
-type metric_sample_rate() :: number().
-type metric_tags() :: map().

-type metric_data() :: {metric_name(), metric_value()}
| {metric_name(), metric_value(), metric_sample_rate()|metric_tags()}
| {metric_name(), metric_value(), metric_sample_rate(), metric_tags()}.

-type event_title() :: iodata().
-type event_text() :: iodata().
-type event_type() :: info | error | warning | success.
Expand All @@ -18,22 +22,26 @@
,metric_type/0
,metric_sample_rate/0
,metric_tags/0
,metric_data/0
]).

-export([
gauge/2, gauge/3, gauge/4
,counter/2, counter/3, counter/4
,increment/2, increment/3, increment/4
,histogram/2, histogram/3, histogram/4
,timer/2, timer/3, timer/4
,timing/2, timing/3, timing/4
,set/2, set/3, set/4
gauge/1, gauge/2, gauge/3, gauge/4
,counter/1 ,counter/2, counter/3, counter/4
,increment/1, increment/2, increment/3, increment/4
,histogram/1, histogram/2, histogram/3, histogram/4
,timer/1, timer/2, timer/3, timer/4
,timing/1, timing/2, timing/3, timing/4
,set/1, set/2, set/3, set/4
,event/1, event/2, event/3, event/4, event/5
]).

-spec send_metric(metric_type(), metric_name(), metric_value(), metric_sample_rate(), metric_tags()) -> ok.
send_metric(Type, Name, Value, SampleRate, Tags) when is_number(Value), is_number(SampleRate) ->
send({metric, {Type, Name, Value, SampleRate, Tags}}).
-spec send_metric(metric_type(), [metric_data()]) -> ok.
send_metric(_Type, []) ->
ok;
send_metric(Type, MetricDataList) ->
NormalizedMetricDataList = [normalize_metric_data(MetricData) || MetricData <- MetricDataList],
send({metric, {Type, NormalizedMetricDataList}}).

-spec send_event(event_title(), event_text(), event_type(), event_priority(), event_tags()) -> ok.
send_event(Title, Text, Type, Priority, Tags) ->
Expand All @@ -43,65 +51,83 @@ send_event(Title, Text, Type, Priority, Tags) ->
send(Data) ->
wpool:cast(dogstatsd_worker, Data).

-define(SPEC_TYPE_1(Type), -spec Type(metric_data() | [metric_data()]) -> ok).
-define(MK_TYPE_1(Type),
Type(MetricDataList) when is_list(MetricDataList) ->
send_metric(Type, MetricDataList);
Type(MetricData) when is_tuple(MetricData) ->
send_metric(Type, [MetricData])
).
-define(SPEC_TYPE_2(Type), -spec Type(metric_name(), metric_value()) -> ok).
-define(MK_TYPE_2(Type),
Type(Name, Value) ->
send_metric(Type, Name, Value, 1.0, #{})
Type(Name, Value) when is_number(Value) ->
send_metric(Type, [{Name, Value}])
).
-define(SPEC_TYPE_3(Type), -spec Type(metric_name(), metric_value(), metric_sample_rate()|metric_tags()) -> ok).
-define(MK_TYPE_3(Type),
Type(Name, Value, SampleRate) when is_number(SampleRate) ->
send_metric(Type, Name, Value, SampleRate, #{});
Type(Name, Value, Tags) when is_map(Tags) ->
send_metric(Type, Name, Value, 1.0, Tags)
Type(Name, Value, SampleRateOrTags) when is_number(Value) andalso (is_number(SampleRateOrTags) orelse is_map(SampleRateOrTags)) ->
send_metric(Type, [{Name, Value, SampleRateOrTags}])
).
-define(SPEC_TYPE_4(Type), -spec Type(metric_name(), metric_value(), metric_sample_rate(), metric_tags()) -> ok).
-define(MK_TYPE_4(Type),
Type(Name, Value, SampleRate, Tags) when is_number(SampleRate), is_map(Tags) ->
send_metric(Type, Name, Value, SampleRate, Tags)
send_metric(Type, [{Name, Value, SampleRate, Tags}])
).

-define(ALIAS_TYPE_1(Alias, Real), Alias(A) -> Real(A)).
-define(ALIAS_TYPE_2(Alias, Real), Alias(A, B) -> Real(A, B)).
-define(ALIAS_TYPE_3(Alias, Real), Alias(A, B, C) -> Real(A, B, C)).
-define(ALIAS_TYPE_4(Alias, Real), Alias(A, B, C, D) -> Real(A, B, C, D)).

?SPEC_TYPE_1(gauge).
?SPEC_TYPE_2(gauge).
?SPEC_TYPE_3(gauge).
?SPEC_TYPE_4(gauge).
?MK_TYPE_1(gauge).
?MK_TYPE_2(gauge).
?MK_TYPE_3(gauge).
?MK_TYPE_4(gauge).

?SPEC_TYPE_1(counter).
?SPEC_TYPE_2(counter).
?SPEC_TYPE_3(counter).
?SPEC_TYPE_4(counter).
?MK_TYPE_1(counter).
?MK_TYPE_2(counter).
?MK_TYPE_3(counter).
?MK_TYPE_4(counter).
?ALIAS_TYPE_1(increment, counter).
?ALIAS_TYPE_2(increment, counter).
?ALIAS_TYPE_3(increment, counter).
?ALIAS_TYPE_4(increment, counter).

?SPEC_TYPE_1(histogram).
?SPEC_TYPE_2(histogram).
?SPEC_TYPE_3(histogram).
?SPEC_TYPE_4(histogram).
?MK_TYPE_1(histogram).
?MK_TYPE_2(histogram).
?MK_TYPE_3(histogram).
?MK_TYPE_4(histogram).

?SPEC_TYPE_1(timer).
?SPEC_TYPE_2(timer).
?SPEC_TYPE_3(timer).
?SPEC_TYPE_4(timer).
?MK_TYPE_1(timer).
?MK_TYPE_2(timer).
?MK_TYPE_3(timer).
?MK_TYPE_4(timer).
?ALIAS_TYPE_1(timing, timer).
?ALIAS_TYPE_2(timing, timer).
?ALIAS_TYPE_3(timing, timer).
?ALIAS_TYPE_4(timing, timer).

?SPEC_TYPE_1(set).
?SPEC_TYPE_2(set).
?SPEC_TYPE_3(set).
?SPEC_TYPE_4(set).
?MK_TYPE_1(set).
?MK_TYPE_2(set).
?MK_TYPE_3(set).
?MK_TYPE_4(set).
Expand All @@ -118,6 +144,19 @@ event(Title, Text, Type, Priority) -> event(Title, Text, Type, Priority, #{}).
event(Title, Text, Type, Priority, Tags) ->
send_event(Title, Text, Type, Priority, Tags).

%%%===================================================================
%%% Internal functions
%%%===================================================================

normalize_metric_data({Name, Value}) ->
{Name, Value, 1.0, #{}};
normalize_metric_data({Name, Value, SampleRate}) when is_number(SampleRate) ->
{Name, Value, SampleRate, #{}};
normalize_metric_data({Name, Value, Tags}) when is_map(Tags) ->
{Name, Value, 1.0, #{}};
normalize_metric_data({_Name, _Value, _SampleRate, _Tags} = AlreadyNormalized) ->
AlreadyNormalized.

%%% Tests
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand All @@ -141,6 +180,10 @@ gauge_test_() ->
,?_assertError(function_clause, dogstatsd:gauge("foo.bar", #{baz => qux}))
,?_assertError(function_clause, dogstatsd:gauge("foo.bar", #{baz => qux}, 0.5))
,?_assertError(function_clause, dogstatsd:gauge("foo.bar", 1, "hello"))
,?_assertEqual(ok, dogstatsd:gauge([{"foo.bar", 1, 0.5, #{foo => bar}},
{"foo.bar", 1, 0.5, #{foo => bar}}]))
,?_assertError(function_clause, dogstatsd:gauge([{"foo.bar", 1, 0.5, #{foo => bar}},
{"foo.bar", 1, "hello"}]))
]}.

-endif.
35 changes: 23 additions & 12 deletions src/dogstatsd_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Data, #state{socket = no_send} = State) ->
{noreply, State};
handle_cast(Data, State) ->
Line = build_line(Data, State),
ok = send_line(Line, State),
Lines = build_lines(Data, State),
ok = send_lines(Lines, State),
{noreply, State}.

handle_info(_Info, State) ->
Expand All @@ -67,12 +67,20 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

build_line({metric, Data}, State) ->
build_metric_line(Data, State);
build_line({event, Data}, State) ->
build_event_line(Data, State).
build_lines({metric, Data}, State) ->
build_metric_lines(Data, State);
build_lines({event, Data}, State) ->
[build_event_line(Data, State)].

build_metric_line({Type, Name, Value, SampleRate, Tags}, State) ->
build_metric_lines({Type, NormalizedMetricDataList}, State) ->
lists:map(
fun(NormalizedMetricData) ->
build_metric_line(Type, NormalizedMetricData, State)
end,
NormalizedMetricDataList
).

build_metric_line(Type, {Name, Value, SampleRate, Tags}, State) ->
LineStart = io_lib:format("~s:~.3f|~s|@~.2f", [prepend_global_prefix(Name, State), float(Value),
metric_type_to_str(Type), float(SampleRate)]),
TagLine = build_tag_line(Tags, State),
Expand Down Expand Up @@ -100,8 +108,11 @@ build_tag_line(Tags, #state{tags=GlobalTags}) ->
[],
maps:merge(GlobalTags, Tags)).

send_line(Line, #state{socket = Socket, host = Host, port = Port}) ->
ok = gen_udp:send(Socket, Host, Port, Line).
send_lines(Lines, #state{socket = Socket, host = Host, port = Port}) ->
ok = lists:foreach(
fun(Line) -> ok = gen_udp:send(Socket, Host, Port, Line) end,
Lines
).

metric_type_to_str(counter) -> "c";
metric_type_to_str(gauge) -> "g";
Expand All @@ -127,7 +138,7 @@ iodata_to_bin(IoList) -> iolist_to_binary(IoList).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

build_line_test_() ->
build_lines_test_() ->
State = #state{
prefix = "test_global_prefix",
tags = #{"test" => true}
Expand All @@ -142,7 +153,7 @@ build_line_test_() ->
Tags1 = #{"event" => "awesome"},

ExpectedLine1 = <<"_e{16,15}:my event's title|my event's text|t:success|p:low|#event:awesome,test:true">>,
ActualLine1 = build_line({event, {Title1, Text1, Type1, Priority1, Tags1}}, State),
[ActualLine1] = build_lines({event, {Title1, Text1, Type1, Priority1, Tags1}}, State),

?_assertEqual(ExpectedLine1, iolist_to_binary(ActualLine1))
end},
Expand All @@ -155,7 +166,7 @@ build_line_test_() ->
Tags2 = #{"version" => 42},

ExpectedLine2 = <<"test_global_prefix.mymetric_name:28.000|h|@12.00|#test:true,version:42">>,
ActualLine2 = build_line({metric, {Type2, Name2, Value2, SampleRate2, Tags2}}, State),
[ActualLine2] = build_lines({metric, {Type2, [{Name2, Value2, SampleRate2, Tags2}]}}, State),

?_assertEqual(ExpectedLine2, iolist_to_binary(ActualLine2))
end}].
Expand Down

0 comments on commit 1d83ac6

Please sign in to comment.