Add traffic plugin
Add traffic plugin
Michal Slaski authored Mar 23, 2017
2 parents fac2cb8 + 399da55 commit cc649eb
Expand Up @@ -55,6 +55,9 @@ start(_StartType, _StartArgs) ->
%% Start EPL Dashboard
{ok, _} = epl_sup:start_child(epl_dashboard, []),

%% Start EPL Dashboard
{ok, _} = epl_sup:start_child(epl_traffic, []),

%% load plugins
PluginApps = plugins(Args),

%%% Copyright (c) 2017,
%%% @doc
%%% gen_server listening to events from epl_tracer
%%% and constructing a #map{} representation of a graph,
%%% where verticies represent Erlang nodes or processes,
%%% and edges represent inter-node traffic or message passing.
%%% @end


%% API

%% gen_server callbacks

-record(state, {subscribers = [],
traffic = [],
msg_pass = #{}}).

%%% API functions

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

subscribe() ->
gen_server:cast(?MODULE, {subscribe, self()}).

unsubscribe() ->
gen_server:cast(?MODULE, {unsubscribe, self()}).

%%% gen_server callbacks

init([]) ->
%% Subscribe to all events from the observed node
ok = epl:subscribe(),

%% Initialise counters, so that later we can calculate deltas
TrafficCounters = get_traffic_counters(),
{ok, #state{traffic = TrafficCounters}}.

handle_call(Request, _From, _State) ->
exit({not_implemented, Request}).

handle_cast({subscribe, Pid}, State = #state{subscribers = Subs}) ->
{noreply, State#state{subscribers = [Pid|Subs]}};
handle_cast({unsubscribe, Pid}, State = #state{subscribers = Subs}) ->
{noreply, State#state{subscribers = lists:delete(Pid, Subs)}};
handle_cast(Request, _State) ->
exit({not_implemented, Request}).

handle_info({data, {_Node, _Timestamp}, Proplist},
State = #state{subscribers = Subs,
traffic = OldTraffic,
msg_pass = OldMsgPass}) ->

{Viz1, NewTraffic} = update_traffic_graph(OldTraffic, new()),

%% By convention we use <<"INTERNET">> as a name of the region,
%% which represents the observed node
Viz2 = get_message_passing_counters(<<"INTERNET">>, Proplist,
Viz1, OldMsgPass),

%% push an update to all subscribed WebSockets
JSON = epl_json:encode(Viz2, <<"traffic-info">>),

[Pid ! {data, JSON} || Pid <- Subs],
{noreply, State#state{traffic = NewTraffic}};
handle_info(Request, _State) ->
exit({not_implemented, Request}).

terminate(_Reason, _State) ->

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%% Internal functions
get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) ->
fun({send, Send}, V) ->
%% Examples of send trace:
%% {{global_name_server,<13104.13.0>},0,1}
%% {#Port<13104.431>,<13104.28.0>},0,72}
%% {{<13104.12.0>,{alias,'[email protected]'}},2,0}
update_message_passing_graph(Node, Send, V, OldMsgPass);
(_, V) ->

update_message_passing_graph(Node, Send, Vizceral, OldMsgPass) ->
%% the INTERNET node represents the source of ingress traffic
Vizceral1 = push_focused(<<"INTERNET">>, Node, Vizceral),

fun({{ID1, ID2}, Count1, Count2}, V) ->
P1 = get_PID_from_trace_event(ID1),
P2 = get_PID_from_trace_event(ID2),
update_messge_passing_graph(Node, P1, P2, Count1, Count2, V)
end, Vizceral1, Send).

get_PID_from_trace_event({P,_}) -> P;
get_PID_from_trace_event(P) -> P.

update_messge_passing_graph(Node, P1, P2, Count1, Count2, V) ->
%% Add verticies representing processes and ports
%% and edges representing message passing
V1 = update_msg_pass_processes(Node, P1, P2, Count1, Count2, V),

%% Add edges between <<"INTERNET">> and ports
%% to represent ingress traffic
update_msg_pass_ports(Node, P1, P2, Count1, Count2, V1).

update_msg_pass_processes(Node, P1, P2, C1, C2, V) ->
V1 = push_focused(P1, Node, V),
V2 = push_focused(P2, Node, V1),
V3 = push_focused_connection(P1, P2, Node, {C1,0,0}, V2),
push_focused_connection(P2, P1, Node, {C2,0,0}, V3).

update_msg_pass_ports(Node, P1, P2, C1, C2, V) ->
V1 = if is_port(P1) ->
push_focused_connection(<<"INTERNET">>, P1,
Node, {C1,0,0}, V);
true ->

if is_port(P2) ->
push_focused_connection(<<"INTERNET">>, P2,
Node, {C2,0,0}, V1);
true ->

get_traffic_counters() ->
{ok, NodesInfo} = command(fun net_kernel:nodes_info/0),

%% map results of calling net_kernel:nodes_info/0
%% to a list of 3-element tuples {Node, Input, Output}
[{NodeName, NodeIn, NodeOut} ||
{NodeName, [{owner,_}, {state,up}, {address, _}, {type,normal},
{in,NodeIn}, {out,NodeOut}]} <- NodesInfo].

update_traffic_graph(OldCounters, Vizceral) ->
%% traverse all connected nodes and read their net_kernel counters
NewCounters = get_traffic_counters(),

%% start creating a map, which represents the Vizceral JSON document
%% region named <<"INTERNET">> represents the observed node
V1 = push_region(<<"INTERNET">>, Vizceral),

%% add as many regions as there are nodes in the cluster
V2 = lists:foldl(fun({Node,_,_}, V) ->
push_region(binarify(Node), V)
end, V1, NewCounters),

%% add links between "INTERNET" and all nodes
%% and compute delta between old and new net_kernel counters
V3 = lists:foldl(
fun({Node, NewIn, NewOut}, V) ->
{OldIn, OldOut} = get_in_out(Node, OldCounters),
push_region_connection(<<"INTERNET">>, binarify(Node),
{NewOut-OldOut, NewIn-OldIn, 0},
#{}, V)
end, V2, NewCounters),

{V3, NewCounters}.

get_in_out(Node, Counters) ->
case lists:keyfind(Node, 1, Counters) of
{Node, In, Out} ->
{In, Out};
false ->
{0, 0}

command(Fun) ->
command(Fun, []).

command(Fun, Args) ->
{ok, Result} = epl_tracer:command(Fun, Args),

%%% functions manipulating Vizceral map
new() ->
entity(global, "edge", [], #{connections => []}).

entity(Renderer, Name, Nodes, Additional) ->
Map = #{
renderer => Renderer,
name => namify(Name),
displayName => binarify(Name),
nodes => Nodes
maps:merge(Map, Additional).

binarify(Name) when is_list(Name) ->
binarify(Name) when is_atom(Name) ->
atom_to_binary(Name, latin1);
binarify(Name) when is_pid(Name) ->
binarify(Name) when is_port(Name) ->
binarify(Name) when is_binary(Name) ->

namify(Name) when is_binary(Name) ->
Name1 = binary:replace(binarify(Name), <<"@">>, <<"_at_">>),
Name2 = binary:replace(binarify(Name1), <<"<">>, <<"">>),
Name3 = binary:replace(binarify(Name2), <<">">>, <<"">>),
binary:replace(binarify(Name3), <<".">>, <<"_">>, [global]);
namify(Name) ->

%% ---------------------- Regions ---------------------
push_region(Name, Vizceral) ->
push_region(Name, #{}, Vizceral).

push_region(Name, Additional, Vizceral) ->
A = maps:merge(#{
connections => [],
maxVolume => 5000
push_node(region, Name, A, Vizceral).

pull_region(Name, Vizceral) ->
{Region, Newlist} = pull_node(Name, Vizceral),
{Region, maps:merge(Vizceral, #{nodes => Newlist})}.

%% ---------------------- Nodes -----------------------
push_node(Renderer, Name, Additional, Entity) ->
#{nodes := Nodes} = Entity,
Newnode = entity(Renderer, Name, [], Additional),
maps:merge(Entity, #{nodes => [Newnode | Nodes]}).

pull_node(Name, Entity) ->
#{nodes := Nodes} = Entity,
{[Node], Rest} = lists:partition(
fun(A) ->
maps:get(name, A) == namify(Name)
end, Nodes),
{Node, Rest}.

%% ------------------- Connections --------------------
push_connection(Source, Target, {N, W, D} , Additional, To) ->
#{connections := Connections} = To,
New = maps:merge(Additional,
#{source => namify(Source),
target => namify(Target),
metrics => #{normal => N,
danger => D,
warning => W}
maps:merge(To, #{connections => [New | Connections]}).

push_region_connection(Source, Target, {N, W, D}, Additional, Vizceral) ->
%% Will crash on nonexisting
pull_region(Source, Vizceral),
pull_region(Target, Vizceral),
push_connection(Source, Target, {N, W, D}, Additional, Vizceral).

push_focused_connection(S, T, RN, NWD, Vizceral) ->
push_focused_connection(S, T, RN, NWD, #{}, Vizceral).

push_focused_connection(Source, Target, RegionName, {N, W, D}, A, Vizceral) ->
{Region, NewV} = pull_region(RegionName, Vizceral),
NewR = push_connection(Source, Target, {N,W,D}, A, Region),
push_region(RegionName, NewR, NewV).

%% ---------------------- Focused ---------------------
push_focused(Name, Region, Vizceral) ->
push_focused(Name, Region, #{} ,Vizceral).

push_focused(Name, RegionName, Additional, Vizceral) ->
#{nodes := Nodes} = Vizceral,
{[Region], Rest} = lists:partition(
fun(A) ->
maps:get(name, A) == namify(RegionName)
end, Nodes),
NewRegion = push_node(focused, Name, Additional, Region),
maps:merge(Vizceral, #{nodes => [NewRegion | Rest]}).
%%% Copyright (c) 2017,
%%% @doc
%%% WebSocket handler returning Vizceral JSON representing traffic
%%% @end

%% EPL plugin callbacks

%% cowboy_websocket_handler callbacks

%%% EPL plugin callbacks
start_link(_Options) ->
{ok, spawn_link(fun() -> receive _ -> ok end end)}.

init(_Options) ->
{ok, [{menu_item, <<"">>}, {author, <<"">>}]}.

%%% cowboy_websocket_handler callbacks
init({tcp, http}, _Req, _Opts) ->
{upgrade, protocol, cowboy_websocket}.

websocket_init(_TransportName, Req, _Opts) ->
{ok, Req, undefined_state}.

websocket_handle({text, Id}, Req, State) ->
Data = epl_traffic:node_info(Id),
{reply, {text, Data}, Req, State};
websocket_handle(Data, _Req, _State) ->
exit({not_implemented, Data}).

websocket_info({data, Data}, Req, State) ->
{reply, {text, Data}, Req, State};
websocket_info(Info, _Req, _State) ->
exit({not_implemented, Info}).

websocket_terminate(_Reason, _Req, _State) ->

