From d17b96d4c0adce9e907542fdd3546fdd6ffa495d Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Mon, 20 Mar 2017 18:40:22 +0100 Subject: [PATCH 1/8] Add traffic plugin --- apps/epl/src/epl_traffic.erl | 202 +++++++++++++++++++++++++++++++ apps/epl/src/epl_traffic_EPL.erl | 53 ++++++++ 2 files changed, 255 insertions(+) create mode 100644 apps/epl/src/epl_traffic.erl create mode 100644 apps/epl/src/epl_traffic_EPL.erl diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl new file mode 100644 index 0000000..3b8fc07 --- /dev/null +++ b/apps/epl/src/epl_traffic.erl @@ -0,0 +1,202 @@ +%%% Copyright (c) 2017, erlang.pl +%%%------------------------------------------------------------------- +%%% @doc +%%% gen_server listening to events from epl_tracer +%%% and constructing a #map{} representation of a graph, +%%% where verticies represent Erlang nodes or processes, +%%% and edges represent inter-node traffic or message passing. +%%% @end +%%%------------------------------------------------------------------- +-module(epl_traffic). + +-behaviour(gen_server). + +%% API +-export([start_link/0, + subscribe/0, + unsubscribe/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {subscribers = [], + counters = []}). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +subscribe() -> + gen_server:cast(?MODULE, {subscribe, self()}). + +unsubscribe() -> + gen_server:cast(?MODULE, {unsubscribe, self()}). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + %% Subscribe to all events from the observed node + ok = epl:subscribe(), + + %% Initialise counters, so that later we can calculate deltas + Counters = get_traffic_counters(), + {ok, #state{counters = Counters}}. + +handle_call(Request, _From, _State) -> + exit({not_implemented, Request}). + +handle_cast({subscribe, Pid}, State = #state{subscribers = Subs}) -> + {noreply, State#state{subscribers = [Pid|Subs]}}; +handle_cast({unsubscribe, Pid}, State = #state{subscribers = Subs}) -> + {noreply, State#state{subscribers = lists:delete(Pid, Subs)}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({data, _, _}, State = #state{subscribers = Subs, + counters = OldCounters}) -> + %% traverse all connected nodes and read their net_kernel counters + NewCounters = get_traffic_counters(), + + %% start creating a map, which represents the Vizceral JSON document + %% region named "INTERNET" is mandatory + V1 = push_region(<<"INTERNET">>, new()), + + Nodes = nodes(connected), + %% add as many regions as there are nodes in the cluster + V2 = lists:foldl(fun(Node, V) -> + NodeBin = atom_to_binary(Node, latin1), + push_region(NodeBin, V) + end, V1, Nodes), + + %% add links between "INTERNET" and all nodes + %% and compute delta between old and new net_kernel counters + V3 = lists:foldl(fun(Node, V) -> + NodeBin = atom_to_binary(Node, latin1), + {OldIn, _OldOut} = get_in_out(Node, OldCounters), + {NewIn, _NewOut} = get_in_out(Node, NewCounters), + push_region_connection( + <<"INTERNET">>, NodeBin, {NewIn-OldIn, 0, 0}, #{}, V) + end, V2, Nodes), + + %% push an update to all subscribed WebSockets + JSON = epl_json:encode(V3, <<"traffic-info">>), + + [Pid ! {data, JSON} || Pid <- Subs], + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +get_traffic_counters() -> + {ok, NodesInfo} = command(fun net_kernel:nodes_info/0), + + %% map results of calling net_kernel:nodes_info/0 + %% to a list of 3-element tuples {Node, Input, Output} + [{NodeName, NodeIn, NodeOut} || + {NodeName, [{owner,_}, {state,up}, {address, _}, {type,normal}, + {in,NodeIn}, {out,NodeOut}]} <- NodesInfo]. + +get_in_out(Node, Counters) -> + case lists:keyfind(Node, 1, Counters) of + {Node, In, Out} -> + {In, Out}; + false -> + {0, 0} + end. + +command(Fun) -> + command(Fun, []). + +command(Fun, Args) -> + {ok, Result} = epl_tracer:command(Fun, Args), + Result. + +%%%=================================================================== +%%% functions manipulating Vizceral map +%%%=================================================================== +new() -> + entity(global, "edge", [], #{connections => []}). + +entity(Renderer, Name, Nodes, Additional) -> + Map = #{ + renderer => Renderer, + name => namify(Name), + displayName => binarify(Name), + nodes => Nodes + }, + maps:merge(Map, Additional). + +binarify(Name) when is_list(Name) -> + list_to_binary(Name); +binarify(Name) -> Name. + +namify(Name) -> + Name1 = binary:replace(binarify(Name), <<"@">>, <<"0">>), + Name2 = binary:replace(binarify(Name1), <<"<">>, <<"">>), + Name3 = binary:replace(binarify(Name2), <<">">>, <<"">>), + binary:replace(binarify(Name3), <<".">>, <<"-">>, [global]). + + +%% ---------------------- Regions --------------------- +push_region(Name, Vizcerl) -> + push_region(Name, #{}, Vizcerl). +push_region(Name, Additional, Vizcerl) -> + A = maps:merge(#{ + connections => [], + maxVolume => 5000 + }, + Additional), + push_node(region, Name, A, Vizcerl). + +pull_region(Name, Vizcerl) -> + {Region, Newlist} = pull_node(Name, Vizcerl), + {Region, maps:merge(Vizcerl, #{nodes => Newlist})}. + +%% ---------------------- Nodes ----------------------- +push_node(Renderer, Name, Additional, Entity) -> + #{nodes := Nodes} = Entity, + Newnode = entity(Renderer, Name, [], Additional), + maps:merge(Entity, #{nodes => [Newnode | Nodes]}). +pull_node(Name, Entity) -> + #{nodes := Nodes} = Entity, + {[Node], Rest} = lists:partition( + fun(A) -> + maps:get(name, A) == namify(Name) + end, Nodes), + {Node, Rest}. + +%% ------------------- Connections -------------------- +push_connection(Source, Target, {N, W, D} , Additional, To) -> + #{connections := Connections} = To, + New = maps:merge(#{ + source => namify(Source), + target => namify(Target), + metrics => #{ + normal => N, + danger => D, + warning => W + } + }, Additional), + maps:merge(To, #{connections => [New | Connections]}). + +push_region_connection(Source, Target, {N, W, D}, Additional, Vizcerl) -> + %% Will crash on nonexisting + pull_region(Source, Vizcerl), + pull_region(Target, Vizcerl), + push_connection(Source, Target, {N, W, D}, Additional, Vizcerl). diff --git a/apps/epl/src/epl_traffic_EPL.erl b/apps/epl/src/epl_traffic_EPL.erl new file mode 100644 index 0000000..2ac6d15 --- /dev/null +++ b/apps/epl/src/epl_traffic_EPL.erl @@ -0,0 +1,53 @@ +%%% Copyright (c) 2017, erlang.pl +%%%------------------------------------------------------------------- +%%% @doc +%%% WebSocket handler returning Vizceral JSON representing traffic +%%% @end +%%%------------------------------------------------------------------- +-module(epl_traffic_EPL). +-behaviour(cowboy_websocket_handler). + +%% EPL plugin callbacks +-export([start_link/1, + init/1]). + +%% cowboy_websocket_handler callbacks +-export([init/3, + websocket_init/3, + websocket_handle/3, + websocket_info/3, + websocket_terminate/3]). + +%%%=================================================================== +%%% EPL plugin callbacks +%%%=================================================================== +start_link(_Options) -> + {ok, spawn_link(fun() -> receive _ -> ok end end)}. + +init(_Options) -> + {ok, [{menu_item, <<"">>}, {author, <<"">>}]}. + +%%%=================================================================== +%%% cowboy_websocket_handler callbacks +%%%=================================================================== +init({tcp, http}, _Req, _Opts) -> + epl_traffic:subscribe(), + {upgrade, protocol, cowboy_websocket}. + +websocket_init(_TransportName, Req, _Opts) -> + {ok, Req, undefined_state}. + +websocket_handle({text, Id}, Req, State) -> + Data = epl_traffic:node_info(Id), + {reply, {text, Data}, Req, State}; +websocket_handle(Data, _Req, _State) -> + exit({not_implemented, Data}). + +websocket_info({data, Data}, Req, State) -> + {reply, {text, Data}, Req, State}; +websocket_info(Info, _Req, _State) -> + exit({not_implemented, Info}). + +websocket_terminate(_Reason, _Req, _State) -> + epl_traffic:unsubscribe(), + ok. From dc860175a9c834a7b96e736f9d81866e727c51cc Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Mon, 20 Mar 2017 19:39:40 +0100 Subject: [PATCH 2/8] Start tarffic plugin --- apps/epl/src/epl_app.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/epl/src/epl_app.erl b/apps/epl/src/epl_app.erl index a27530f..5f5cb4c 100644 --- a/apps/epl/src/epl_app.erl +++ b/apps/epl/src/epl_app.erl @@ -55,6 +55,9 @@ start(_StartType, _StartArgs) -> %% Start EPL Dashboard {ok, _} = epl_sup:start_child(epl_dashboard, []), + %% Start EPL Dashboard + {ok, _} = epl_sup:start_child(epl_traffic, []), + %% load plugins PluginApps = plugins(Args), From 1c7e072da7a9ecd771d56af0226f68a9d6b980c9 Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Tue, 21 Mar 2017 07:49:29 +0100 Subject: [PATCH 3/8] Render all nodes connected to the observed node --- apps/epl/src/epl_traffic.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 3b8fc07..4e30f4f 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -71,22 +71,21 @@ handle_info({data, _, _}, State = #state{subscribers = Subs, %% region named "INTERNET" is mandatory V1 = push_region(<<"INTERNET">>, new()), - Nodes = nodes(connected), %% add as many regions as there are nodes in the cluster - V2 = lists:foldl(fun(Node, V) -> + V2 = lists:foldl(fun({Node,_,_}, V) -> NodeBin = atom_to_binary(Node, latin1), push_region(NodeBin, V) - end, V1, Nodes), + end, V1, NewCounters), %% add links between "INTERNET" and all nodes %% and compute delta between old and new net_kernel counters - V3 = lists:foldl(fun(Node, V) -> - NodeBin = atom_to_binary(Node, latin1), - {OldIn, _OldOut} = get_in_out(Node, OldCounters), - {NewIn, _NewOut} = get_in_out(Node, NewCounters), - push_region_connection( - <<"INTERNET">>, NodeBin, {NewIn-OldIn, 0, 0}, #{}, V) - end, V2, Nodes), + V3 = lists:foldl( + fun({Node, NewIn, NewOut}, V) -> + NodeBin = atom_to_binary(Node, latin1), + {OldIn, OldOut} = get_in_out(Node, OldCounters), + push_region_connection( + <<"INTERNET">>, NodeBin, {NewIn-OldIn, 0, 0}, #{}, V) + end, V2, NewCounters), %% push an update to all subscribed WebSockets JSON = epl_json:encode(V3, <<"traffic-info">>), From 61da14a8a21da6d32944748e0cbbe042db70b9b5 Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Tue, 21 Mar 2017 09:40:52 +0100 Subject: [PATCH 4/8] Mark in yellow traffic coming to the node (ingress) --- apps/epl/src/epl_traffic.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 4e30f4f..6ec47c7 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -83,15 +83,16 @@ handle_info({data, _, _}, State = #state{subscribers = Subs, fun({Node, NewIn, NewOut}, V) -> NodeBin = atom_to_binary(Node, latin1), {OldIn, OldOut} = get_in_out(Node, OldCounters), - push_region_connection( - <<"INTERNET">>, NodeBin, {NewIn-OldIn, 0, 0}, #{}, V) + push_region_connection(<<"INTERNET">>, NodeBin, + {NewOut-OldOut, NewIn-OldIn, 0}, + #{}, V) end, V2, NewCounters), %% push an update to all subscribed WebSockets JSON = epl_json:encode(V3, <<"traffic-info">>), [Pid ! {data, JSON} || Pid <- Subs], - {noreply, State}. + {noreply, State#state{counters = NewCounters}}. terminate(_Reason, _State) -> ok. @@ -146,10 +147,10 @@ binarify(Name) when is_list(Name) -> binarify(Name) -> Name. namify(Name) -> - Name1 = binary:replace(binarify(Name), <<"@">>, <<"0">>), + Name1 = binary:replace(binarify(Name), <<"@">>, <<"_at_">>), Name2 = binary:replace(binarify(Name1), <<"<">>, <<"">>), Name3 = binary:replace(binarify(Name2), <<">">>, <<"">>), - binary:replace(binarify(Name3), <<".">>, <<"-">>, [global]). + binary:replace(binarify(Name3), <<".">>, <<"_">>, [global]). %% ---------------------- Regions --------------------- From ec522fdcb0126c9a208ceba2a5e49f32ce9ab31a Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Tue, 21 Mar 2017 19:12:26 +0100 Subject: [PATCH 5/8] Add view visualising message passing --- apps/epl/src/epl_traffic.erl | 159 ++++++++++++++++++++++++++--------- 1 file changed, 119 insertions(+), 40 deletions(-) diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 6ec47c7..56e4159 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -59,40 +59,28 @@ handle_cast({subscribe, Pid}, State = #state{subscribers = Subs}) -> {noreply, State#state{subscribers = [Pid|Subs]}}; handle_cast({unsubscribe, Pid}, State = #state{subscribers = Subs}) -> {noreply, State#state{subscribers = lists:delete(Pid, Subs)}}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Request, _State) -> + exit({not_implemented, Request}). -handle_info({data, _, _}, State = #state{subscribers = Subs, - counters = OldCounters}) -> - %% traverse all connected nodes and read their net_kernel counters - NewCounters = get_traffic_counters(), +handle_info({data, {_Node, _Timestamp}, Proplist}, + State = #state{subscribers = Subs, counters = OldCounters}) -> - %% start creating a map, which represents the Vizceral JSON document - %% region named "INTERNET" is mandatory - V1 = push_region(<<"INTERNET">>, new()), + Viz = new(), - %% add as many regions as there are nodes in the cluster - V2 = lists:foldl(fun({Node,_,_}, V) -> - NodeBin = atom_to_binary(Node, latin1), - push_region(NodeBin, V) - end, V1, NewCounters), + {Viz1, NewCounters} = update_traffic_graph(OldCounters, Viz), - %% add links between "INTERNET" and all nodes - %% and compute delta between old and new net_kernel counters - V3 = lists:foldl( - fun({Node, NewIn, NewOut}, V) -> - NodeBin = atom_to_binary(Node, latin1), - {OldIn, OldOut} = get_in_out(Node, OldCounters), - push_region_connection(<<"INTERNET">>, NodeBin, - {NewOut-OldOut, NewIn-OldIn, 0}, - #{}, V) - end, V2, NewCounters), + %% By convention we use <<"INTERNET">> as a name of the region, + %% which represents the observed node + Viz2 = get_message_passing_counters(<<"INTERNET">>, Proplist, Viz1), %% push an update to all subscribed WebSockets - JSON = epl_json:encode(V3, <<"traffic-info">>), + JSON = epl_json:encode(Viz2, <<"traffic-info">>), [Pid ! {data, JSON} || Pid <- Subs], - {noreply, State#state{counters = NewCounters}}. + {noreply, State#state{counters = NewCounters}}; +handle_info(Request, _State) -> + exit({not_implemented, Request}). + terminate(_Reason, _State) -> ok. @@ -103,6 +91,44 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +get_message_passing_counters(Node, Proplist, Vizceral) -> + lists:foldl( + fun({send, Send}, V) -> + %% Examples of send trace: + %% {{global_name_server,<13104.13.0>},0,1} + %% {#Port<13104.431>,<13104.28.0>},0,72} + %% {{<13104.12.0>,{alias,'erlangpl@127.0.0.1'}},2,0} + update_message_passing_graph(Node, Send, V); + (_, V) -> + V + end, + Vizceral, + Proplist). + +update_message_passing_graph(Node, Send, Vizceral) -> + %% Below is a workaround for how Vizceral requires the INTERNET + %% node to appear in its graph + {{RootVertex, _}, _, _} = hd(Send), + Viz1 = push_focused(<<"INTERNET">>, Node, Vizceral), + Viz2 = push_focused(RootVertex, Node, Viz1), + Viz3 = push_focused_connection( + <<"INTERNET">>, RootVertex, Node, {0,0,0}, #{}, Viz2), + + %% Update Vizceral graph with verticies representing processes + %% and edges representing message passing + lists:foldl( + fun({{P1, P2}, Count1, Count2}, V) + when not is_tuple(P1), not is_tuple(P2) -> + V1 = push_focused(P1, Node, V), + V2 = push_focused(P2, Node, V1), + V3 = push_focused_connection(P1, P2, Node, + {Count1,0,0}, #{}, V2), + push_focused_connection(P2, P1, Node, + {Count2,0,0}, #{}, V3); + (_, V) -> + V + end, Viz3, Send). + get_traffic_counters() -> {ok, NodesInfo} = command(fun net_kernel:nodes_info/0), @@ -112,6 +138,31 @@ get_traffic_counters() -> {NodeName, [{owner,_}, {state,up}, {address, _}, {type,normal}, {in,NodeIn}, {out,NodeOut}]} <- NodesInfo]. +update_traffic_graph(OldCounters, Vizceral) -> + %% traverse all connected nodes and read their net_kernel counters + NewCounters = get_traffic_counters(), + + %% start creating a map, which represents the Vizceral JSON document + %% region named <<"INTERNET">> represents the observed node + V1 = push_region(<<"INTERNET">>, Vizceral), + + %% add as many regions as there are nodes in the cluster + V2 = lists:foldl(fun({Node,_,_}, V) -> + push_region(binarify(Node), V) + end, V1, NewCounters), + + %% add links between "INTERNET" and all nodes + %% and compute delta between old and new net_kernel counters + V3 = lists:foldl( + fun({Node, NewIn, NewOut}, V) -> + {OldIn, OldOut} = get_in_out(Node, OldCounters), + push_region_connection(<<"INTERNET">>, binarify(Node), + {NewOut-OldOut, NewIn-OldIn, 0}, + #{}, V) + end, V2, NewCounters), + + {V3, NewCounters}. + get_in_out(Node, Counters) -> case lists:keyfind(Node, 1, Counters) of {Node, In, Out} -> @@ -144,29 +195,40 @@ entity(Renderer, Name, Nodes, Additional) -> binarify(Name) when is_list(Name) -> list_to_binary(Name); -binarify(Name) -> Name. +binarify(Name) when is_atom(Name) -> + atom_to_binary(Name, latin1); +binarify(Name) when is_pid(Name) -> + list_to_binary(pid_to_list(Name)); +binarify(Name) when is_port(Name) -> + list_to_binary(erlang:port_to_list(Name)); +binarify(Name) when is_binary(Name) -> + Name. -namify(Name) -> + +namify(Name) when is_binary(Name) -> Name1 = binary:replace(binarify(Name), <<"@">>, <<"_at_">>), Name2 = binary:replace(binarify(Name1), <<"<">>, <<"">>), Name3 = binary:replace(binarify(Name2), <<">">>, <<"">>), - binary:replace(binarify(Name3), <<".">>, <<"_">>, [global]). + binary:replace(binarify(Name3), <<".">>, <<"_">>, [global]); +namify(Name) -> + namify(binarify(Name)). + %% ---------------------- Regions --------------------- -push_region(Name, Vizcerl) -> - push_region(Name, #{}, Vizcerl). -push_region(Name, Additional, Vizcerl) -> +push_region(Name, Vizceral) -> + push_region(Name, #{}, Vizceral). +push_region(Name, Additional, Vizceral) -> A = maps:merge(#{ connections => [], maxVolume => 5000 }, Additional), - push_node(region, Name, A, Vizcerl). + push_node(region, Name, A, Vizceral). -pull_region(Name, Vizcerl) -> - {Region, Newlist} = pull_node(Name, Vizcerl), - {Region, maps:merge(Vizcerl, #{nodes => Newlist})}. +pull_region(Name, Vizceral) -> + {Region, Newlist} = pull_node(Name, Vizceral), + {Region, maps:merge(Vizceral, #{nodes => Newlist})}. %% ---------------------- Nodes ----------------------- push_node(Renderer, Name, Additional, Entity) -> @@ -195,8 +257,25 @@ push_connection(Source, Target, {N, W, D} , Additional, To) -> }, Additional), maps:merge(To, #{connections => [New | Connections]}). -push_region_connection(Source, Target, {N, W, D}, Additional, Vizcerl) -> +push_region_connection(Source, Target, {N, W, D}, Additional, Vizceral) -> %% Will crash on nonexisting - pull_region(Source, Vizcerl), - pull_region(Target, Vizcerl), - push_connection(Source, Target, {N, W, D}, Additional, Vizcerl). + pull_region(Source, Vizceral), + pull_region(Target, Vizceral), + push_connection(Source, Target, {N, W, D}, Additional, Vizceral). + +push_focused_connection(Source, Target, RegionName, {N, W, D}, Additional, Vizceral) -> + {Region, NewV} = pull_region(RegionName, Vizceral), + NewR = push_connection(Source, Target, {N,W,D}, Additional, Region), + push_region("", NewR, NewV). + +%% ---------------------- Focused --------------------- +push_focused(Name, Region, Vizceral) -> + push_focused(Name, Region, #{} ,Vizceral). +push_focused(Name, RegionName, Additional, Vizceral) -> + #{nodes := Nodes} = Vizceral, + {[Region], Rest} = lists:partition( + fun(A) -> + maps:get(name, A) == namify(RegionName) + end, Nodes), + NewRegion = push_node(focused, Name, Additional, Region), + maps:merge(Vizceral, #{nodes => [NewRegion | Rest]}). From dcb9895ec3b0885b4da1260cb888e6368ec8d135 Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Thu, 23 Mar 2017 06:40:02 -0700 Subject: [PATCH 6/8] Fixes and refactorings in the traffic plugin --- apps/epl/src/epl_traffic.erl | 64 +++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 56e4159..8060c5d 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -25,7 +25,8 @@ code_change/3]). -record(state, {subscribers = [], - counters = []}). + traffic = [], + msg_pass = #{}}). %%%=================================================================== %%% API functions @@ -49,8 +50,8 @@ init([]) -> ok = epl:subscribe(), %% Initialise counters, so that later we can calculate deltas - Counters = get_traffic_counters(), - {ok, #state{counters = Counters}}. + TrafficCounters = get_traffic_counters(), + {ok, #state{traffic = TrafficCounters}}. handle_call(Request, _From, _State) -> exit({not_implemented, Request}). @@ -63,21 +64,22 @@ handle_cast(Request, _State) -> exit({not_implemented, Request}). handle_info({data, {_Node, _Timestamp}, Proplist}, - State = #state{subscribers = Subs, counters = OldCounters}) -> + State = #state{subscribers = Subs, + traffic = OldTraffic, + msg_pass = OldMsgPass}) -> - Viz = new(), - - {Viz1, NewCounters} = update_traffic_graph(OldCounters, Viz), + {Viz1, NewTraffic} = update_traffic_graph(OldTraffic, new()), %% By convention we use <<"INTERNET">> as a name of the region, %% which represents the observed node - Viz2 = get_message_passing_counters(<<"INTERNET">>, Proplist, Viz1), + Viz2 = get_message_passing_counters(<<"INTERNET">>, Proplist, + Viz1, OldMsgPass), %% push an update to all subscribed WebSockets JSON = epl_json:encode(Viz2, <<"traffic-info">>), [Pid ! {data, JSON} || Pid <- Subs], - {noreply, State#state{counters = NewCounters}}; + {noreply, State#state{traffic = NewTraffic}}; handle_info(Request, _State) -> exit({not_implemented, Request}). @@ -91,28 +93,28 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -get_message_passing_counters(Node, Proplist, Vizceral) -> +get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) -> lists:foldl( fun({send, Send}, V) -> %% Examples of send trace: %% {{global_name_server,<13104.13.0>},0,1} %% {#Port<13104.431>,<13104.28.0>},0,72} %% {{<13104.12.0>,{alias,'erlangpl@127.0.0.1'}},2,0} - update_message_passing_graph(Node, Send, V); + update_message_passing_graph(Node, Send, V, OldMsgPass); (_, V) -> V end, Vizceral, Proplist). -update_message_passing_graph(Node, Send, Vizceral) -> +update_message_passing_graph(Node, Send, Vizceral, OldMsgPass) -> %% Below is a workaround for how Vizceral requires the INTERNET %% node to appear in its graph {{RootVertex, _}, _, _} = hd(Send), Viz1 = push_focused(<<"INTERNET">>, Node, Vizceral), Viz2 = push_focused(RootVertex, Node, Viz1), Viz3 = push_focused_connection( - <<"INTERNET">>, RootVertex, Node, {0,0,0}, #{}, Viz2), + <<"INTERNET">>, RootVertex, Node, {0,0,0}, Viz2), %% Update Vizceral graph with verticies representing processes %% and edges representing message passing @@ -121,10 +123,8 @@ update_message_passing_graph(Node, Send, Vizceral) -> when not is_tuple(P1), not is_tuple(P2) -> V1 = push_focused(P1, Node, V), V2 = push_focused(P2, Node, V1), - V3 = push_focused_connection(P1, P2, Node, - {Count1,0,0}, #{}, V2), - push_focused_connection(P2, P1, Node, - {Count2,0,0}, #{}, V3); + V3 = push_focused_connection(P1, P2, Node, {Count1,0,0}, V2), + push_focused_connection(P2, P1, Node, {Count2,0,0}, V3); (_, V) -> V end, Viz3, Send). @@ -218,6 +218,7 @@ namify(Name) -> %% ---------------------- Regions --------------------- push_region(Name, Vizceral) -> push_region(Name, #{}, Vizceral). + push_region(Name, Additional, Vizceral) -> A = maps:merge(#{ connections => [], @@ -235,6 +236,7 @@ push_node(Renderer, Name, Additional, Entity) -> #{nodes := Nodes} = Entity, Newnode = entity(Renderer, Name, [], Additional), maps:merge(Entity, #{nodes => [Newnode | Nodes]}). + pull_node(Name, Entity) -> #{nodes := Nodes} = Entity, {[Node], Rest} = lists:partition( @@ -245,16 +247,14 @@ pull_node(Name, Entity) -> %% ------------------- Connections -------------------- push_connection(Source, Target, {N, W, D} , Additional, To) -> - #{connections := Connections} = To, - New = maps:merge(#{ - source => namify(Source), - target => namify(Target), - metrics => #{ - normal => N, - danger => D, - warning => W - } - }, Additional), + #{connections := Connections} = To, + New = maps:merge(Additional, + #{source => namify(Source), + target => namify(Target), + metrics => #{normal => N, + danger => D, + warning => W} + }), maps:merge(To, #{connections => [New | Connections]}). push_region_connection(Source, Target, {N, W, D}, Additional, Vizceral) -> @@ -263,14 +263,18 @@ push_region_connection(Source, Target, {N, W, D}, Additional, Vizceral) -> pull_region(Target, Vizceral), push_connection(Source, Target, {N, W, D}, Additional, Vizceral). -push_focused_connection(Source, Target, RegionName, {N, W, D}, Additional, Vizceral) -> +push_focused_connection(S, T, RN, NWD, Vizceral) -> + push_focused_connection(S, T, RN, NWD, #{}, Vizceral). + +push_focused_connection(Source, Target, RegionName, {N, W, D}, A, Vizceral) -> {Region, NewV} = pull_region(RegionName, Vizceral), - NewR = push_connection(Source, Target, {N,W,D}, Additional, Region), - push_region("", NewR, NewV). + NewR = push_connection(Source, Target, {N,W,D}, A, Region), + push_region(RegionName, NewR, NewV). %% ---------------------- Focused --------------------- push_focused(Name, Region, Vizceral) -> push_focused(Name, Region, #{} ,Vizceral). + push_focused(Name, RegionName, Additional, Vizceral) -> #{nodes := Nodes} = Vizceral, {[Region], Rest} = lists:partition( From 3b7fed5c7d2db4fc6677a003795c220846a8e792 Mon Sep 17 00:00:00 2001 From: Michal Slaski Date: Thu, 23 Mar 2017 07:59:29 -0700 Subject: [PATCH 7/8] Make the INTERNET node represent the source of ingress traffic --- apps/epl/src/epl_traffic.erl | 60 ++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/apps/epl/src/epl_traffic.erl b/apps/epl/src/epl_traffic.erl index 8060c5d..60a3d64 100644 --- a/apps/epl/src/epl_traffic.erl +++ b/apps/epl/src/epl_traffic.erl @@ -108,26 +108,48 @@ get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) -> Proplist). update_message_passing_graph(Node, Send, Vizceral, OldMsgPass) -> - %% Below is a workaround for how Vizceral requires the INTERNET - %% node to appear in its graph - {{RootVertex, _}, _, _} = hd(Send), - Viz1 = push_focused(<<"INTERNET">>, Node, Vizceral), - Viz2 = push_focused(RootVertex, Node, Viz1), - Viz3 = push_focused_connection( - <<"INTERNET">>, RootVertex, Node, {0,0,0}, Viz2), - - %% Update Vizceral graph with verticies representing processes - %% and edges representing message passing + %% the INTERNET node represents the source of ingress traffic + Vizceral1 = push_focused(<<"INTERNET">>, Node, Vizceral), + lists:foldl( - fun({{P1, P2}, Count1, Count2}, V) - when not is_tuple(P1), not is_tuple(P2) -> - V1 = push_focused(P1, Node, V), - V2 = push_focused(P2, Node, V1), - V3 = push_focused_connection(P1, P2, Node, {Count1,0,0}, V2), - push_focused_connection(P2, P1, Node, {Count2,0,0}, V3); - (_, V) -> - V - end, Viz3, Send). + fun({{ID1, ID2}, Count1, Count2}, V) -> + P1 = get_PID_from_trace_event(ID1), + P2 = get_PID_from_trace_event(ID2), + update_messge_passing_graph(Node, P1, P2, Count1, Count2, V) + end, Vizceral1, Send). + +get_PID_from_trace_event({P,_}) -> P; +get_PID_from_trace_event(P) -> P. + +update_messge_passing_graph(Node, P1, P2, Count1, Count2, V) -> + %% Add verticies representing processes and ports + %% and edges representing message passing + V1 = update_msg_pass_processes(Node, P1, P2, Count1, Count2, V), + + %% Add edges between <<"INTERNET">> and ports + %% to represent ingress traffic + update_msg_pass_ports(Node, P1, P2, Count1, Count2, V1). + +update_msg_pass_processes(Node, P1, P2, C1, C2, V) -> + V1 = push_focused(P1, Node, V), + V2 = push_focused(P2, Node, V1), + V3 = push_focused_connection(P1, P2, Node, {C1,0,0}, V2), + push_focused_connection(P2, P1, Node, {C2,0,0}, V3). + +update_msg_pass_ports(Node, P1, P2, C1, C2, V) -> + V1 = if is_port(P1) -> + push_focused_connection(<<"INTERNET">>, P1, + Node, {C1,0,0}, V); + true -> + V + end, + + if is_port(P2) -> + push_focused_connection(<<"INTERNET">>, P2, + Node, {C2,0,0}, V1); + true -> + V1 + end. get_traffic_counters() -> {ok, NodesInfo} = command(fun net_kernel:nodes_info/0), From 399da553d9a9a99d02d6b3d8f5b240c3682f76a1 Mon Sep 17 00:00:00 2001 From: Tomasz Cichocinski Date: Thu, 23 Mar 2017 22:27:33 +0100 Subject: [PATCH 8/8] Bump erlangpl-ui to 0.2.0 --- erlangpl-ui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erlangpl-ui b/erlangpl-ui index 285dff6..634d693 160000 --- a/erlangpl-ui +++ b/erlangpl-ui @@ -1 +1 @@ -Subproject commit 285dff6ff69f81eac53b070900b05e460d4ebe39 +Subproject commit 634d69384860c3ac5219a9c91114e08fccd8f982