-
Notifications
You must be signed in to change notification settings - Fork 40
Add the feature of viewing traffic inside every node in the cluster #90
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,8 @@ | |
|
||
-record(state, {subscribers = [], | ||
traffic = [], | ||
msg_pass = #{}}). | ||
msg_pass = #{}, | ||
viz_map = undefined}). | ||
|
||
%%%=================================================================== | ||
%%% API functions | ||
|
@@ -47,7 +48,8 @@ unsubscribe() -> | |
|
||
init([]) -> | ||
%% Subscribe to all events from the observed node | ||
ok = epl:subscribe(default_node), | ||
Reply = epl:subscribe(), | ||
true = verify_subscribe_reply(Reply), | ||
|
||
%% Initialise counters, so that later we can calculate deltas | ||
TrafficCounters = get_traffic_counters(), | ||
|
@@ -66,25 +68,19 @@ handle_cast(Request, _State) -> | |
handle_info({data, {Node, _Timestamp}, Proplist}, | ||
State = #state{subscribers = Subs, | ||
traffic = OldTraffic, | ||
msg_pass = OldMsgPass}) -> | ||
|
||
{Viz1, NewTraffic} = update_traffic_graph(Node, OldTraffic, | ||
epl_viz_map:new(Node)), | ||
|
||
|
||
%% We're starting from observed node which is our graph entry point | ||
Viz2 = get_message_passing_counters(Node, Proplist, | ||
Viz1, OldMsgPass), | ||
|
||
%% push an update to all subscribed WebSockets | ||
JSON = epl_json:encode(Viz2, <<"traffic-info">>), | ||
|
||
[Pid ! {data, JSON} || Pid <- Subs], | ||
{noreply, State#state{traffic = NewTraffic}}; | ||
msg_pass = OldMsgPass, | ||
viz_map = OldViz}) -> | ||
VerifiedNode = verify_if_node_is_default(Node), | ||
{NewTraffic, NewViz} = handle_incoming_node_info(VerifiedNode, | ||
Proplist, | ||
OldTraffic, | ||
OldMsgPass, | ||
OldViz, | ||
Subs), | ||
{noreply, State#state{traffic = NewTraffic, viz_map = NewViz}}; | ||
handle_info(Request, _State) -> | ||
exit({not_implemented, Request}). | ||
|
||
|
||
terminate(_Reason, _State) -> | ||
ok. | ||
|
||
|
@@ -94,7 +90,71 @@ code_change(_OldVsn, State, _Extra) -> | |
%%%=================================================================== | ||
%%% Internal functions | ||
%%%=================================================================== | ||
verify_subscribe_reply(Reply) -> | ||
lists:all(fun(X) -> X =:= ok end, Reply). | ||
|
||
verify_if_node_is_default(Node) -> | ||
verify_if_node_is_default(Node, epl:get_default_node()). | ||
|
||
verify_if_node_is_default(Node, DefaultNode) | ||
when Node =:= DefaultNode -> | ||
{true, Node}; | ||
verify_if_node_is_default(Node, _DefaultNode) -> | ||
{false, Node}. | ||
|
||
handle_incoming_node_info({false, _Node}, _Proplist, OldTraffic, _OldMsgPass, | ||
OldViz = undefined, _Subs) -> | ||
{OldTraffic, OldViz}; | ||
handle_incoming_node_info({true, Node}, Proplist, OldTraffic, | ||
OldMsgPass, OldViz, Subs) -> | ||
{Viz1, NewTraffic} = update_traffic_graph(Node, OldTraffic, | ||
epl_viz_map:new(Node)), | ||
Viz2 = merge_existing_focused_nodes_and_conns(Viz1, OldViz), | ||
%% We're starting from observed node which is our graph entry point | ||
Viz3 = get_message_passing_counters(Node, Proplist, Viz2, OldMsgPass), | ||
%% push an update to all subscribed WebSockets | ||
JSON = epl_json:encode(Viz3, <<"traffic-info">>), | ||
|
||
[Pid ! {data, JSON} || Pid <- Subs], | ||
{NewTraffic, Viz3}; | ||
handle_incoming_node_info({false, Node}, Proplist, OldTraffic, | ||
OldMsgPass, OldViz, _Subs) -> | ||
%% We're starting from observed node which is our graph entry point | ||
Viz1 = get_message_passing_counters(Node, Proplist, OldViz, OldMsgPass), | ||
{OldTraffic, Viz1}. | ||
|
||
merge_existing_focused_nodes_and_conns(Viz, undefined) -> | ||
Viz; | ||
merge_existing_focused_nodes_and_conns(Viz = #{nodes := Nodes}, OldViz) -> | ||
lists:foldl(fun(Node, NewViz) -> | ||
maybe_merge_focused_nodes_and_conns(Node, NewViz, OldViz) | ||
end, Viz, Nodes). | ||
maybe_merge_focused_nodes_and_conns(#{name := Node}, Viz, OldViz) -> | ||
{Region, _} = epl_viz_map:pull_region(Node, OldViz), | ||
merge_focused_nodes_and_conns(Node, Viz, Region). | ||
|
||
merge_focused_nodes_and_conns(_Node, Viz, []) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe better name for this function would be |
||
Viz; | ||
merge_focused_nodes_and_conns(Node, Viz, #{nodes := OldFocusedNodes, | ||
connections := OldFocusedConns}) -> | ||
{Region, Viz2 = #{nodes := Regions}} = epl_viz_map:pull_region(Node, Viz), | ||
RegionNodes = finally_merge_focused_nodes(Region, OldFocusedNodes), | ||
RegionNodesAndConns = finally_merge_focused_conns(RegionNodes, | ||
OldFocusedConns), | ||
finally_merge_focused_nodes_and_conns(Viz2, RegionNodesAndConns, Regions). | ||
|
||
finally_merge_focused_nodes(Region, OldFocusedNodes) -> | ||
maps:merge(Region, #{nodes => OldFocusedNodes}). | ||
|
||
finally_merge_focused_conns(Region, OldFocusedConns) -> | ||
maps:merge(Region, #{connections => OldFocusedConns}). | ||
|
||
finally_merge_focused_nodes_and_conns(Viz, UpdatedRegion, Regions) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about |
||
maps:merge(Viz, #{nodes => [UpdatedRegion | Regions]}). | ||
|
||
get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that |
||
VizceralCleared = | ||
epl_viz_map:clear_focused_nodes_inside_region(Node, Vizceral), | ||
lists:foldl( | ||
fun({send, Send}, V) -> | ||
%% Examples of send trace: | ||
|
@@ -105,13 +165,12 @@ get_message_passing_counters(Node, Proplist, Vizceral, OldMsgPass) -> | |
(_, V) -> | ||
V | ||
end, | ||
Vizceral, | ||
VizceralCleared, | ||
Proplist). | ||
|
||
update_message_passing_graph(Node, Send, Vizceral, _OldMsgPass) -> | ||
%% the INTERNET node represents the source of ingress traffic | ||
Vizceral1 = epl_viz_map:push_focused(<<"INTERNET">>, Node, Vizceral), | ||
|
||
lists:foldl( | ||
fun({{ID1, ID2}, Count1, Count2}, V) -> | ||
P1 = get_PID_from_trace_event(ID1), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
push_focused/4, | ||
push_focused_connection/5, | ||
push_focused_connection/6, | ||
clear_focused_nodes_inside_region/2, | ||
binarify/1, | ||
namify/1]). | ||
|
||
|
@@ -78,11 +79,12 @@ push_node(Renderer, Name, Additional, Entity) -> | |
-spec pull_node(Name :: name(), Entity :: map()) -> {map(), list()}. | ||
pull_node(Name, Entity) -> | ||
#{nodes := Nodes} = Entity, | ||
{[Node], Rest} = lists:partition( | ||
{Node, Rest} = lists:partition( | ||
fun(A) -> | ||
maps:get(name, A) == namify(Name) | ||
end, Nodes), | ||
{Node, Rest}. | ||
VerifiedNode = verify_if_pulled_node_exists(Node), | ||
{VerifiedNode, Rest}. | ||
|
||
%% @doc Pushes additional `Info' into cluster nodes section in `Vizceral' map. | ||
-spec push_additional_node_info(Info :: map(), Name :: atom(), | ||
|
@@ -144,6 +146,16 @@ push_focused_connection(Source, Target, RegionName, {N, W, D}, A, Vizceral) -> | |
NewR = push_connection(Source, Target, {N,W,D}, A, Region), | ||
push_region(RegionName, NewR, NewV). | ||
|
||
%% @doc Clears all focused nodes from `RegionName` node. | ||
-spec clear_focused_nodes_inside_region(RegionName :: name(), Viz :: map()) -> | ||
map(). | ||
clear_focused_nodes_inside_region(RegionName, Viz) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about using |
||
{VizNode, NewViz = #{nodes := VizNodes}} = | ||
epl_viz_map:pull_region(RegionName, Viz), | ||
VizNodeCleared = maps:merge(VizNode, #{nodes => []}), | ||
VizNodeCleared2 = maps:merge(VizNodeCleared, #{connections => []}), | ||
maps:merge(NewViz, #{nodes => [VizNodeCleared2 | VizNodes]}). | ||
|
||
%%----------------------- Names ----------------------- | ||
%% @doc Transforms `Name' to binary. | ||
-spec binarify(Name :: name()) -> binary(). | ||
|
@@ -186,3 +198,6 @@ entity(Renderer, Name, Nodes, Additional) -> | |
nodes => Nodes | ||
}, | ||
maps:merge(Map, Additional). | ||
|
||
verify_if_pulled_node_exists([]) -> []; | ||
verify_if_pulled_node_exists([Node]) -> Node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe could you use
R
instead ofX
inside the fun?R
is closer toReply
thanX
:)