diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 76cc0ce264..f1de08b39c 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -33,6 +33,7 @@ import ( bnet "github.com/livepeer/go-livepeer-basicnet" "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/eth" + lpmon "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/net" "github.com/livepeer/go-livepeer/server" "github.com/livepeer/go-livepeer/types" @@ -85,6 +86,8 @@ func main() { ethPassword := flag.String("ethPassword", "", "New Eth account password") gethipc := flag.String("gethipc", "", "Geth ipc file location") protocolAddr := flag.String("protocolAddr", "", "Protocol smart contract address") + monitor := flag.Bool("monitor", false, "Set to true to send performance metrics") + monhost := flag.String("monitorhost", "metrics.livepeer.org", "host name for the metrics data collector") flag.Parse() @@ -111,7 +114,11 @@ func main() { return } - node, err := bnet.NewNode(*port, priv, pub) + if *monitor { + lpmon.Endpoint = *monhost + } + notifiee := bnet.NewBasicNotifiee(lpmon.Instance()) + node, err := bnet.NewNode(*port, priv, pub, notifiee) if err != nil { glog.Errorf("Error creating a new node: %v", err) return @@ -134,6 +141,7 @@ func main() { glog.Errorf("Cannot set up protocol:%v", err) return } + lpmon.Instance().SetBootNode() } else { if err := n.Start(*bootID, *bootAddr); err != nil { glog.Errorf("Cannot connect to bootstrap node: %v", err) @@ -394,6 +402,7 @@ func txDataToVideoProfile(txData string) []types.VideoProfile { } func stream(port string, streamID string) { + start := time.Now() if streamID == "" { glog.Errorf("Need to specify streamID via -id") return @@ -404,13 +413,21 @@ func stream(port string, streamID string) { glog.Infof("url: %v", url) err := cmd.Start() if err != nil { - glog.Infof("Couldn't start the stream") + glog.Infof("Couldn't start the stream. Make sure a local Livepeer node is running on port %v", port) os.Exit(1) } glog.Infof("Now streaming") err = cmd.Wait() + if err != nil { + glog.Infof("Couldn't start the stream. Make sure a local Livepeer node is running on port %v", port) + os.Exit(1) + } - glog.Infof("Finished the stream") + if time.Since(start) < time.Second { + glog.Infof("Error: Make sure local Livepeer node is running on port %v", port) + } else { + glog.Infof("Finished the stream") + } return } diff --git a/core/core_test.go b/core/core_test.go index 5334f4a9e2..07db983d12 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -256,7 +256,7 @@ func monitorChan(intChan chan int) { } func TestCreateTranscodeJob(t *testing.T) { priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) - node, err := bnet.NewNode(15000, priv, pub) + node, err := bnet.NewNode(15000, priv, pub, nil) if err != nil { glog.Errorf("Error creating a new node: %v", err) return @@ -296,7 +296,7 @@ func TestCreateTranscodeJob(t *testing.T) { func TestNotifyBroadcaster(t *testing.T) { priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) - node, err := bnet.NewNode(15000, priv, pub) + node, err := bnet.NewNode(15000, priv, pub, nil) if err != nil { glog.Errorf("Error creating a new node: %v", err) return diff --git a/core/streamdb.go b/core/streamdb.go index d5e53779da..d3cfeef0e8 100644 --- a/core/streamdb.go +++ b/core/streamdb.go @@ -9,6 +9,7 @@ import ( "github.com/ericxtang/m3u8" "github.com/golang/glog" + lpmon "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/lpms/stream" ) @@ -60,6 +61,7 @@ func (s *StreamDB) GetRTMPStream(id StreamID) stream.RTMPVideoStream { func (s *StreamDB) AddNewHLSStream(strmID StreamID) (strm stream.HLSVideoStream, err error) { strm = stream.NewBasicHLSVideoStream(strmID.String(), stream.DefaultSegWaitTime) s.streams[strmID] = strm + lpmon.Instance().LogStream(strmID.String(), 0, 0) // glog.Infof("Adding new video stream with ID: %v", strmID) return strm, nil @@ -69,6 +71,7 @@ func (s *StreamDB) AddNewHLSStream(strmID StreamID) (strm stream.HLSVideoStream, func (s *StreamDB) AddNewRTMPStream(strmID StreamID) (strm stream.RTMPVideoStream, err error) { strm = stream.NewBasicRTMPVideoStream(strmID.String()) s.streams[strmID] = strm + lpmon.Instance().LogStream(strmID.String(), 0, 0) // glog.Infof("Adding new video stream with ID: %v", strmID) return strm, nil @@ -92,12 +95,14 @@ func (s *StreamDB) AddHLSVariant(hlsStrmID StreamID, varStrmID StreamID, variant //Add variant streamID to streams map so we can keep track s.streams[varStrmID] = strm + lpmon.Instance().LogStream(varStrmID.String(), 0, 0) return nil } //AddStream adds an existing video stream. func (s *StreamDB) AddStream(strmID StreamID, strm stream.VideoStream_) (err error) { s.streams[strmID] = strm + lpmon.Instance().LogStream(strmID.String(), 0, 0) return nil } diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 0000000000..81f2d9c114 --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,101 @@ +package monitor + +import ( + "context" + "time" + + "github.com/golang/glog" + "github.com/livepeer/streamingviz/data" +) + +type Monitor struct { + Node *data.Node +} + +var monitor *Monitor +var Endpoint string + +const PostFreq = time.Second * 30 + +func Instance() *Monitor { + if monitor == nil { + monitor = newMonitor() + monitor.StartWorker(context.Background()) + } + + return monitor +} + +func newMonitor() *Monitor { + return &Monitor{Node: data.NewNode("")} +} + +func (m *Monitor) SetBootNode() { + m.Node.SetBootNode() +} + +func (m *Monitor) LogNewConn(local, remote string) { + if m.Node.ID == "" { + m.Node.ID = local + } + + m.Node.AddConn(local, remote) +} + +func (m *Monitor) RemoveConn(local, remote string) { + m.Node.RemoveConn(local, remote) +} + +func (m *Monitor) LogStream(strmID string, size uint, avgChunkSize uint) { + m.Node.SetStream(strmID, size, avgChunkSize) +} + +func (m *Monitor) RemoveStream(strmID string) { + m.Node.RemoveStream(strmID) +} + +func (m *Monitor) LogBroadcaster(strmID string) { + m.Node.SetBroadcast(strmID) +} + +func (m *Monitor) RemoveBroadcast(strmID string) { + m.Node.RemoveBroadcast(strmID) +} + +func (m *Monitor) LogRelay(strmID, remote string) { + m.Node.SetRelay(strmID, remote) +} + +func (m *Monitor) RemoveRelay(strmID string) { + m.Node.RemoveRelay(strmID) +} + +func (m *Monitor) LogSub(strmID string) { + m.Node.SetSub(strmID) +} + +func (m *Monitor) RemoveSub(strmID string) { + m.Node.RemoveSub(strmID) +} + +func (m *Monitor) LogBuffer(strmID string) { + m.Node.AddBufferEvent(strmID) +} + +func (m *Monitor) StartWorker(ctx context.Context) { + ticker := time.NewTicker(PostFreq) + go func() { + for { + select { + case <-ticker.C: + // glog.Infof("Posting node status to monitor") + if m.Node.ID != "" { + m.Node.SubmitToCollector(Endpoint) + } + case <-ctx.Done(): + glog.Errorf("Monitor Worker Done") + return + } + } + }() +} diff --git a/monitor/monitor_test.go b/monitor/monitor_test.go new file mode 100644 index 0000000000..1d78cf9e1d --- /dev/null +++ b/monitor/monitor_test.go @@ -0,0 +1,38 @@ +package monitor + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/golang/glog" + "github.com/livepeer/streamingviz/data" +) + +func TestMonitor(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rBody, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Errorf("Error reading r: %v", err) + } + + node := &data.Node{} + err = json.Unmarshal(rBody, node) + if err != nil { + t.Errorf("Error unmarshaling: %v", err) + } + + glog.Infof("result: %v", node) + })) + defer ts.Close() + + m := Instance() + m.LogNewConn("local1", "remote1") + m.LogNewConn("local2", "remote2") + m.RemoveConn("local1", "remote1") + m.LogStream("sid", 10, 10) + + m.Node.SubmitToCollector(ts.URL) +} diff --git a/server/mediaserver.go b/server/mediaserver.go index 7310235ef8..21afdc9b98 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -20,6 +20,7 @@ import ( "github.com/golang/glog" "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/eth" + lpmon "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/types" lpmscore "github.com/livepeer/lpms/core" "github.com/livepeer/lpms/segmenter" @@ -320,8 +321,10 @@ func getHLSMediaPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Med return nil, err } + lpmon.Instance().LogBuffer(strmID.String()) // glog.Infof("Waiting for playlist... err: %v", err) time.Sleep(2 * time.Second) + continue } else { // glog.Infof("Found playlist. Returning") diff --git a/server/mediaserver_test.go b/server/mediaserver_test.go index afb1abffff..59ab0224fa 100644 --- a/server/mediaserver_test.go +++ b/server/mediaserver_test.go @@ -26,7 +26,7 @@ var S *LivepeerServer func setupServer() *LivepeerServer { if S == nil { priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) - node, err := bnet.NewNode(15000, priv, pub) + node, err := bnet.NewNode(15000, priv, pub, nil) if err != nil { glog.Errorf("Error creating a new node: %v", err) return nil diff --git a/test.sh b/test.sh index 9ca3bf9bbf..11f041bb52 100644 --- a/test.sh +++ b/test.sh @@ -10,7 +10,12 @@ go test -logtostderr=true t2=$? cd .. -if (($t1!=0||$t2!=0)) +cd monitor +go test -logtostderr=true +t3=$? +cd .. + +if (($t1!=0||$t2!=0||$t3!=0)) then printf "\n\nSome Tests Failed\n\n" exit -1 diff --git a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_broadcaster.go b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_broadcaster.go index 5017c7997b..b48575441d 100644 --- a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_broadcaster.go +++ b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_broadcaster.go @@ -12,6 +12,7 @@ import ( //BasicBroadcaster is unique for a specific video stream. It keeps track of a list of listeners and a queue of video chunks. It won't start keeping track of things until there is at least 1 listener. type BasicBroadcaster struct { Network *BasicVideoNetwork + lastMsg *StreamDataMsg q chan *StreamDataMsg listeners map[string]*BasicStream StrmID string @@ -31,7 +32,8 @@ func (b *BasicBroadcaster) Broadcast(seqNo uint64, data []byte) error { b.working = true } - b.q <- &StreamDataMsg{SeqNo: seqNo, Data: data} + b.lastMsg = &StreamDataMsg{SeqNo: seqNo, Data: data} + b.q <- b.lastMsg return nil } @@ -62,17 +64,20 @@ func (b *BasicBroadcaster) broadcastToListeners(ctx context.Context) { // glog.Infof("broadcasting msg:%v to network. listeners: %v", msg, b.listeners) for id, l := range b.listeners { // glog.Infof("Broadcasting segment %v to listener %v", msg.SeqNo, id) - if err := l.SendMessage(StreamDataID, StreamDataMsg{SeqNo: msg.SeqNo, StrmID: b.StrmID, Data: msg.Data}); err != nil { - glog.Errorf("Error broadcasting segment %v to listener %v: %v", msg.SeqNo, id, err) - delete(b.listeners, id) - } + b.sendDataMsg(id, l, msg) } case <-ctx.Done(): glog.Infof("broadcast worker done") return } } +} +func (b *BasicBroadcaster) sendDataMsg(lid string, l *BasicStream, msg *StreamDataMsg) { + if err := l.SendMessage(StreamDataID, StreamDataMsg{SeqNo: msg.SeqNo, StrmID: b.StrmID, Data: msg.Data}); err != nil { + glog.Errorf("Error broadcasting segment %v to listener %v: %v", msg.SeqNo, lid, err) + delete(b.listeners, lid) + } } func (b BasicBroadcaster) String() string { diff --git a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_network.go b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_network.go index 25c2340010..c975fc5654 100644 --- a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_network.go +++ b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_network.go @@ -20,6 +20,7 @@ import ( "github.com/ericxtang/m3u8" "github.com/golang/glog" + lpmon "github.com/livepeer/go-livepeer/monitor" lpnet "github.com/livepeer/go-livepeer/net" ) @@ -73,6 +74,7 @@ func (n *BasicVideoNetwork) GetBroadcaster(strmID string) (lpnet.Broadcaster, er if !ok { b = &BasicBroadcaster{Network: n, StrmID: strmID, q: make(chan *StreamDataMsg), listeners: make(map[string]*BasicStream)} n.broadcasters[strmID] = b + lpmon.Instance().LogBroadcaster(strmID) } return b, nil } @@ -83,6 +85,7 @@ func (n *BasicVideoNetwork) GetSubscriber(strmID string) (lpnet.Subscriber, erro if !ok { s = &BasicSubscriber{Network: n, StrmID: strmID, host: n.NetworkNode.PeerHost, msgChan: make(chan StreamDataMsg)} n.subscribers[strmID] = s + lpmon.Instance().LogSub(strmID) } return s, nil } @@ -274,6 +277,7 @@ func handleSubReq(nw *BasicVideoNetwork, subReq SubReqMsg, ws *BasicStream) erro //TODO: Add verification code for the SubNodeID (Make sure the message is not spoofed) remotePid := peer.IDHexEncode(ws.Stream.Conn().RemotePeer()) b.listeners[remotePid] = ws + b.sendDataMsg(remotePid, ws, b.lastMsg) return nil } else if r := nw.relayers[subReq.StrmID]; r != nil { //Already a relayer in place. Subscribe as a listener. @@ -326,6 +330,7 @@ func handleSubReq(nw *BasicVideoNetwork, subReq SubReqMsg, ws *BasicStream) erro //Create a relayer, register the listener r := nw.NewRelayer(subReq.StrmID) r.UpstreamPeer = p + lpmon.Instance().LogRelay(subReq.StrmID, peer.IDHexEncode(p)) remotePid := peer.IDHexEncode(ws.Stream.Conn().RemotePeer()) r.listeners[remotePid] = ws return nil @@ -349,6 +354,7 @@ func handleCancelSubReq(nw *BasicVideoNetwork, cr CancelSubMsg, rpeer peer.ID) e } else if r := nw.relayers[cr.StrmID]; r != nil { //Remove from relayer listener delete(r.listeners, peer.IDHexEncode(rpeer)) + lpmon.Instance().RemoveRelay(cr.StrmID) //Pass on the cancel req and remove relayer if relayer has no more listeners if len(r.listeners) == 0 { ns := nw.NetworkNode.GetStream(r.UpstreamPeer) @@ -412,6 +418,7 @@ func handleFinishStream(nw *BasicVideoNetwork, fs FinishStreamMsg) error { glog.Errorf("Error relaying finish stream: %v", err) } delete(nw.relayers, fs.StrmID) + lpmon.Instance().RemoveRelay(fs.StrmID) } if s == nil && r == nil { diff --git a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_notifiee.go b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_notifiee.go index 9e03110295..ac5fac23c4 100644 --- a/vendor/github.com/livepeer/go-livepeer-basicnet/basic_notifiee.go +++ b/vendor/github.com/livepeer/go-livepeer-basicnet/basic_notifiee.go @@ -6,10 +6,17 @@ import ( net "gx/ipfs/QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF/go-libp2p-net" "github.com/golang/glog" + lpmon "github.com/livepeer/go-livepeer/monitor" ) //BasicNotifiee gets called during important libp2p events -type BasicNotifiee struct{} +type BasicNotifiee struct { + monitor *lpmon.Monitor +} + +func NewBasicNotifiee(mon *lpmon.Monitor) *BasicNotifiee { + return &BasicNotifiee{monitor: mon} +} // called when network starts listening on an addr func (bn *BasicNotifiee) Listen(n net.Network, addr ma.Multiaddr) { @@ -24,11 +31,13 @@ func (bn *BasicNotifiee) ListenClose(n net.Network, addr ma.Multiaddr) { // called when a connection opened func (bn *BasicNotifiee) Connected(n net.Network, conn net.Conn) { glog.Infof("Notifiee - Connected. Local: %v - Remote: %v", peer.IDHexEncode(conn.LocalPeer()), peer.IDHexEncode(conn.RemotePeer())) + bn.monitor.LogNewConn(peer.IDHexEncode(conn.LocalPeer()), peer.IDHexEncode(conn.RemotePeer())) } // called when a connection closed func (bn *BasicNotifiee) Disconnected(n net.Network, conn net.Conn) { - glog.Infof("Notifiee - Disconnected") + glog.Infof("Notifiee - Disconnected. Local: %v - Remote: %v", peer.IDHexEncode(conn.LocalPeer()), peer.IDHexEncode(conn.RemotePeer())) + bn.monitor.RemoveConn(peer.IDHexEncode(conn.LocalPeer()), peer.IDHexEncode(conn.RemotePeer())) } // called when a stream opened diff --git a/vendor/github.com/livepeer/go-livepeer-basicnet/network_node.go b/vendor/github.com/livepeer/go-livepeer-basicnet/network_node.go index 3331881cd6..369a9f06d8 100644 --- a/vendor/github.com/livepeer/go-livepeer-basicnet/network_node.go +++ b/vendor/github.com/livepeer/go-livepeer-basicnet/network_node.go @@ -14,6 +14,7 @@ import ( peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" host "gx/ipfs/QmZy7c24mmkEHpNJndwgsEE3wcVxHd8yB969yTnAJFVw7f/go-libp2p-host" crypto "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" + inet "gx/ipfs/QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF/go-libp2p-net" swarm "gx/ipfs/QmaijwHnbD4SabGA8C2fN9gchptLvRe2RxqTU5XkjAGBw5/go-libp2p-swarm" bhost "gx/ipfs/QmapADMpK4e5kFGBxC2aHreaDqKP9vmMng5f91MA14Ces9/go-libp2p/p2p/host/basic" rhost "gx/ipfs/QmapADMpK4e5kFGBxC2aHreaDqKP9vmMng5f91MA14Ces9/go-libp2p/p2p/host/routed" @@ -27,7 +28,7 @@ type NetworkNode struct { } //NewNode creates a new Livepeerd node. -func NewNode(listenPort int, priv crypto.PrivKey, pub crypto.PubKey) (*NetworkNode, error) { +func NewNode(listenPort int, priv crypto.PrivKey, pub crypto.PubKey, f inet.Notifiee) (*NetworkNode, error) { pid, err := peer.IDFromPublicKey(pub) if err != nil { return nil, err @@ -61,7 +62,7 @@ func NewNode(listenPort int, priv crypto.PrivKey, pub crypto.PubKey) (*NetworkNo store, &BasicReporter{}) - netwrk.Notify(&BasicNotifiee{}) + netwrk.Notify(f) basicHost := bhost.New(netwrk, bhost.NATPortMap) dht, err := constructDHTRouting(context.Background(), basicHost, ds.NewMapDatastore()) diff --git a/vendor/github.com/livepeer/lpms/stream/basic_hls_videostream.go b/vendor/github.com/livepeer/lpms/stream/basic_hls_videostream.go index 0fbbed7cbd..b67c766831 100644 --- a/vendor/github.com/livepeer/lpms/stream/basic_hls_videostream.go +++ b/vendor/github.com/livepeer/lpms/stream/basic_hls_videostream.go @@ -8,6 +8,7 @@ import ( "github.com/ericxtang/m3u8" "github.com/golang/glog" + lpmon "github.com/livepeer/go-livepeer/monitor" ) const DefaultMediaPlLen = uint(500) @@ -88,6 +89,7 @@ func (s *BasicHLSVideoStream) GetHLSSegment(strmID string, segName string) (*HLS seg, ok := s.sqMap[sqMapKey(strmID, segName)] if !ok { + lpmon.Instance().LogBuffer(strmID) time.Sleep(SegWaitInterval) continue } diff --git a/vendor/github.com/livepeer/streamingviz/README.md b/vendor/github.com/livepeer/streamingviz/README.md new file mode 100644 index 0000000000..0b13629a7d --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/README.md @@ -0,0 +1,27 @@ +# Livepeer Network Visualization + +Right now the Livepeer nodes report their status to a central visualization server by default. We'll remove this after the initial build, but for now, during debugging it is useful to see which nodes connect to one another and what roles they are playing for a given stream. + +## Start the Visualizaiton Server + +From the `streamingviz` root directory, run + + go run ./server/server.go + +Start the livepeer nodes with the `--viz` flag to allow them to send their info the viz server. The `--vizhost ` flag will specify where the server is running (defaults to `http://localhost:8585`). + + livepeer --viz --vizhost http://gateway1-toynet.livepeer.org:8585 + +## Access the visualization + +If the visualization server is running, you can access the visualizaiton at [http://localhost:8585?streamid=\] for any given stream id. Accessing it without the argument will show the entire network, but not any stream data about who is broadcasting or consuming. + +Nodes will report their peer list to the server every 20 seconds. Visualization will refresh to update itself every 30 seconds. + +## TODO + +1. Make sure that the http requests aren't blocking +2. Add `LogDone()` events when nodes are done broadcasting, consuming, or relaying. +3. Add a dropdown of known streams to the visualization so we can inspect them all without copying and pasting the streamID. +4. Add a timeout to remove nodes that haven't checked in in awhile. + diff --git a/vendor/github.com/livepeer/streamingviz/client/client.go b/vendor/github.com/livepeer/streamingviz/client/client.go new file mode 100644 index 0000000000..f302ed2ce2 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/client/client.go @@ -0,0 +1,117 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" +) + +type Client struct { + NodeID string + Enabled bool + Endpoint string + PeersChan chan []string + BroadcastChan chan string + ConsumeChan chan string + RelayChan chan string + DoneChan chan string +} + +func NewClient(nodeID string, enabled bool, host string) *Client { + return &Client{ + NodeID: nodeID, + Enabled: enabled, + Endpoint: fmt.Sprintf("%s/event", host), // Default. Override if you'd like to change, + PeersChan: make(chan []string), + BroadcastChan: make(chan string), + ConsumeChan: make(chan string), + RelayChan: make(chan string), + DoneChan: make(chan string), + } +} + +func (self *Client) LogPeers(peers []string) { + self.PeersChan <- peers +} + +func (self *Client) LogBroadcast(streamID string) { + self.BroadcastChan <- streamID +} + +func (self *Client) LogConsume(streamID string) { + self.ConsumeChan <- streamID +} + +func (self *Client) LogRelay(streamID string) { + self.RelayChan <- streamID +} + +func (self *Client) LogDone(streamID string) { + self.DoneChan <- streamID +} + +func (self *Client) InitData(eventName string) (data map[string]interface{}) { + data = make(map[string]interface{}) + data["name"] = eventName + data["node"] = self.NodeID + return +} + +func (self *Client) PostEvent(data map[string]interface{}) { + // For now just don't actually post the data to a server if viz is not enabled + if self.Enabled { + enc, _ := json.Marshal(data) + + req, err := http.NewRequest("POST", self.Endpoint, bytes.NewBuffer(enc)) + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Println("Couldn't connect to the event server", err) + return + } + defer resp.Body.Close() + } +} + +// Starts consuming events. NodeID must be set or else this will error. +// Pass doneC <- true from the calling thread when you wish to stop the event loop. +func (self *Client) ConsumeEvents(doneC chan bool) error { + if self.NodeID == "" { + return fmt.Errorf("NodeID must be set before consuming events") + } + + go self.consumeLoop(doneC) + return nil +} + +func (self *Client) consumeLoop(doneC chan bool) { + for { + select { + case peers := <-self.PeersChan: + data := self.InitData("peers") + data["peers"] = peers + self.PostEvent(data) + case streamID := <-self.BroadcastChan: + data := self.InitData("broadcast") + data["streamId"] = streamID + self.PostEvent(data) + case streamID := <-self.ConsumeChan: + data := self.InitData("consume") + data["streamId"] = streamID + self.PostEvent(data) + case streamID := <-self.RelayChan: + data := self.InitData("relay") + data["streamId"] = streamID + self.PostEvent(data) + case streamID := <-self.DoneChan: + data := self.InitData("done") + data["streamId"] = streamID + self.PostEvent(data) + case <-doneC: + return + } + } +} diff --git a/vendor/github.com/livepeer/streamingviz/client/client_test.go b/vendor/github.com/livepeer/streamingviz/client/client_test.go new file mode 100644 index 0000000000..50a85cee4a --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/client/client_test.go @@ -0,0 +1,67 @@ +package client + +import ( + "testing" +) + +// Need to be running a server on port 8585 to run the below. +// For now treat it as an example usage +func testVizClient(t *testing.T) { + host := "http://localhost:8585" + doneC1 := make(chan bool) + doneC2 := make(chan bool) + doneC3 := make(chan bool) + + client := NewClient("A", true, host) + client.ConsumeEvents(doneC1) + + client.LogPeers([]string{"B", "C"}) + client.LogBroadcast("stream1") + + client2 := NewClient("B", true, host) + client2.ConsumeEvents(doneC2) + + client2.LogPeers([]string{"A", "D"}) + + client3 := NewClient("D", true, host) + client3.ConsumeEvents(doneC3) + + client3.LogPeers([]string{"B"}) + client3.LogConsume("stream1") + + client2.LogRelay("stream1") + doneC1 <- true + doneC2 <- true + doneC3 <- true +} + +func TestBiggerNetwork(t *testing.T) { + host := "http://localhost:8585" + streamID := "stream2" + nodes := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "H", + "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"} + doneCs := make([]chan bool, len(nodes)) + clients := make([]*Client, len(nodes)) + + for i := 0; i < len(nodes); i++ { + doneCs[i] = make(chan bool) + clients[i] = NewClient(nodes[i], true, host) + clients[i].ConsumeEvents(doneCs[i]) + + if i < len(nodes)-6 { + clients[i].LogPeers(nodes[i+1 : i+6]) + } + } + + clients[0].LogBroadcast(streamID) + clients[2].LogConsume(streamID) + clients[9].LogConsume(streamID) + clients[13].LogConsume(streamID) + clients[20].LogConsume(streamID) + clients[7].LogRelay(streamID) + clients[17].LogRelay(streamID) + + for i := 0; i < len(nodes); i++ { + doneCs[i] <- true + } +} diff --git a/vendor/github.com/livepeer/streamingviz/data/data.go b/vendor/github.com/livepeer/streamingviz/data/data.go new file mode 100644 index 0000000000..bc82350678 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/data/data.go @@ -0,0 +1,272 @@ +package data + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/golang/glog" +) + +const NetworkTimerFreq = time.Second * 60 + +type Network struct { + Nodes map[string]*Node + Timer map[string]time.Time +} + +func NewNetwork() *Network { + n := &Network{ + Nodes: make(map[string]*Node), + Timer: make(map[string]time.Time)} + n.startTimer(context.Background()) + return n +} + +func (n *Network) startTimer(ctx context.Context) { + ticker := time.NewTicker(NetworkTimerFreq) + go func() { + for { + select { + case <-ticker.C: + for id, insertTime := range n.Timer { + if time.Since(insertTime) > NetworkTimerFreq { + delete(n.Timer, id) + delete(n.Nodes, id) + } + } + case <-ctx.Done(): + glog.Errorf("Monitor Network Timer Done") + return + } + } + }() +} + +func (n *Network) SetNode(node *Node) { + n.Nodes[node.ID] = node + n.Timer[node.ID] = time.Now() +} + +func (n *Network) ToD3Json() interface{} { + nodes := make([]map[string]interface{}, 0) + links := make([]map[string]string, 0) + streams := make(map[string]map[string]interface{}) + + for _, n := range n.Nodes { + // glog.Infof("Node: %v", n.ID) + //Add nodes + nmap := map[string]interface{}{"id": n.ID, "bootnode": n.IsBootNode} + nodes = append(nodes, nmap) + + //Add links + for _, conn := range n.Conns { + links = append(links, map[string]string{"source": conn.N1, "target": conn.N2}) + } + + //Add streams, broadcasters, relayers and subscribers + checkStrm := func(id string, streams map[string]map[string]interface{}) { + strm, ok := streams[id] + if !ok { + // glog.Infof("New Stream: %v", id) + strm = map[string]interface{}{"broadcaster": "", "relayers": make([]string, 0), "subscribers": make([]map[string]string, 0)} + streams[id] = strm + } + } + + for _, b := range n.Broadcasts { + checkStrm(b.StrmID, streams) + streams[b.StrmID]["broadcaster"] = n.ID + } + for _, sub := range n.Subs { + // glog.Infof("n: %v, strmID: %v", n.ID, sub.StrmID) + checkStrm(sub.StrmID, streams) + streams[sub.StrmID]["subscribers"] = append(streams[sub.StrmID]["subscribers"].([]map[string]string), map[string]string{"id": n.ID, "buffer": string(sub.BufferCount)}) + } + for _, r := range n.Relays { + // glog.Infof("n: %v, strmID: %v", n.ID, r.StrmID) + checkStrm(r.StrmID, streams) + streams[r.StrmID]["relayers"] = append(streams[r.StrmID]["relayers"].([]string), n.ID) + } + + } + + result := map[string]interface{}{ + "nodes": nodes, + "links": links, + "streams": streams, + } + + b, err := json.Marshal(result) + if err != nil { + glog.Errorf("Error marshaling json: %v", err) + } + + var res interface{} + json.Unmarshal(b, &res) + return res +} + +type Node struct { + ID string `json:"id"` + IsBootNode bool `json:"isBootNode"` + Conns []Conn `json:"conns"` + Strms map[string]Stream `json:"strms"` + Broadcasts map[string]Broadcast `json:"broadcasts"` + Relays map[string]Relay `json:"relays"` + Subs map[string]*Subscription `json:"subs"` + ConnsInLastHr uint `json:"connsInLastHr"` + StrmsInLastHr uint `json:"strmsInLastHr"` + TotalVidLenInLastHr uint `json:"totalVidLenInLastHr"` + TotalRelayInLastHr uint `json:"totalRelayInLastHr"` + AvgCPU float32 `json:"avgCPU"` + AvgMem uint32 `json:"avgMem"` + AvgBandwidth uint32 `json:"avgBandwidth"` + ConnWorker map[Conn]time.Time `json:"-"` +} + +func NewNode(id string) *Node { + n := &Node{ + ID: id, + Conns: make([]Conn, 0), + Strms: make(map[string]Stream), + Broadcasts: make(map[string]Broadcast), + Relays: make(map[string]Relay), + Subs: make(map[string]*Subscription), + ConnWorker: make(map[Conn]time.Time), + } + + return n +} + +func (n *Node) AddConn(local, remote string) { + newConn := NewConn(local, remote) + n.Conns = append(n.Conns, newConn) + n.ConnsInLastHr++ + n.ConnWorker[newConn] = time.Now() +} + +func (n *Node) RemoveConn(local, remote string) { + rmc := NewConn(local, remote) + for i, c := range n.Conns { + if c == rmc { + n.Conns = append(n.Conns[:i], n.Conns[i+1:]...) + } + } +} + +func (n *Node) SetBootNode() { + n.IsBootNode = true +} + +func (n *Node) SetStream(id string, size, avgChunkSize uint) { + + // n.Strms = append(n.Strms, Stream{ID: id, Chunks: size, AvgChunkSize: avgChunkSize}) + n.Strms[id] = Stream{ID: id, Chunks: size, AvgChunkSize: avgChunkSize} +} + +func (n *Node) RemoveStream(id string) { + delete(n.Strms, id) +} + +func (n *Node) SetBroadcast(strmID string) { + // n.Broadcasts = append(n.Broadcasts, Broadcast{StrmID: id}) + n.Broadcasts[strmID] = Broadcast{StrmID: strmID} +} + +func (n *Node) RemoveBroadcast(strmID string) { + delete(n.Broadcasts, strmID) +} + +func (n *Node) SetSub(strmID string) { + // n.Subs = append(n.Subs, Subscription{StrmID: strmID}) + n.Subs[strmID] = &Subscription{StrmID: strmID} +} + +func (n *Node) RemoveSub(strmID string) { + delete(n.Subs, strmID) +} + +func (n *Node) AddBufferEvent(strmID string) { + // for i, sub := range n.Subs { + // if sub.StrmID == strmID { + // n.Subs[i].BufferCount++ + // } + // } + + glog.Info("Logging buffer event") + n.Subs[strmID].BufferCount = n.Subs[strmID].BufferCount + 1 +} + +func (n *Node) SetRelay(strmID string, remote string) { + // n.Relays = append(n.Relays, Relay{StrmID: strmID, RemoteN: remote}) + n.Relays[strmID] = Relay{StrmID: strmID, RemoteN: remote} +} + +func (n *Node) RemoveRelay(strmID string) { + delete(n.Relays, strmID) +} + +func (n *Node) SubmitToCollector(endpoint string) { + if endpoint == "" { + return + } + + enc, err := json.Marshal(n) + if err != nil { + glog.Errorf("Error marshaling node status: %v", err) + return + } + + req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(enc)) + if err != nil { + glog.Errorf("Error creating new request: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Println("Couldn't connect to the event server", err) + return + } + defer resp.Body.Close() +} + +type Conn struct { + N1 string `json:"n1"` + N2 string `json:"n2"` +} + +func NewConn(local, remote string) Conn { + if strings.Compare(local, remote) > 0 { + return Conn{N1: local, N2: remote} + } else { + return Conn{N1: remote, N2: local} + } +} + +type Stream struct { + ID string `json:"id"` + Chunks uint `json:"chunks"` + AvgChunkSize uint `json:"avgChunkSize"` +} + +type Broadcast struct { + StrmID string +} + +type Relay struct { + StrmID string + RemoteN string +} + +type Subscription struct { + StrmID string + BufferCount uint +} diff --git a/vendor/github.com/livepeer/streamingviz/data/data_test.go b/vendor/github.com/livepeer/streamingviz/data/data_test.go new file mode 100644 index 0000000000..8cfda1e903 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/data/data_test.go @@ -0,0 +1,40 @@ +package data + +import ( + "testing" +) + +func TestToD3Json(t *testing.T) { + n1 := NewNode("A") + n1.AddConn("A", "B") + n1.SetBroadcast("strm1") + n1.SetStream("strm1", 15, 100) + + n2 := NewNode("B") + n2.AddConn("B", "A") + n2.AddConn("B", "C") + n2.SetRelay("strm1", "A") + + n3 := NewNode("C") + n3.AddConn("C", "B") + n3.SetSub("strm1") + n3.SetStream("strm1", 10, 100) + + n := NewNetwork() + n.SetNode(n1) + n.SetNode(n2) + n.SetNode(n3) + + json := n.ToD3Json().(map[string]interface{}) + if _, nok := json["nodes"]; !nok { + t.Errorf("Wrong json: %v", json) + } + + if _, lok := json["links"]; !lok { + t.Errorf("Wrong json: %v", json) + } + + if _, sok := json["streams"]; !sok { + t.Errorf("Wrong json: %v", json) + } +} diff --git a/vendor/github.com/livepeer/streamingviz/server/server.go b/vendor/github.com/livepeer/streamingviz/server/server.go new file mode 100644 index 0000000000..164a01a482 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/server/server.go @@ -0,0 +1,193 @@ +package main + +import ( + "encoding/json" + "fmt" + "html/template" + "io/ioutil" + "net/http" + "path/filepath" + + "github.com/golang/glog" + "github.com/livepeer/streamingviz" + "github.com/livepeer/streamingviz/data" +) + +var networkDB *streamingviz.Network +var network *data.Network + +func init() { + networkDB = streamingviz.NewNetwork() + network = data.NewNetwork() +} + +func handler(w http.ResponseWriter, r *http.Request) { + abs, _ := filepath.Abs("./server/static/index.html") + view, err := template.ParseFiles(abs) + + // data := getData() + + // network := getNetwork() + // data := networkToData(network, "teststream") + + // streamID := r.URL.Query().Get("streamid") + // data := networkToData(networkDB, streamID) + data := network.ToD3Json() + + if err != nil { + fmt.Fprintf(w, "error: %v", err) + } else { + view.Execute(w, data) + } +} + +func handleEvent(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + fmt.Fprintf(w, "Error. You must POST events") + return + } + + body, _ := ioutil.ReadAll(r.Body) + var event map[string]interface{} + if err := json.Unmarshal(body, &event); err != nil { + fmt.Fprintf(w, "Error unmarshalling request: %v", err) + return + } + + eventName := event["name"].(string) + node := event["node"].(string) + + switch eventName { + case "peers": + peers := event["peers"].([]interface{}) + peerList := make([]string, 0) + for _, v := range peers { + peerList = append(peerList, v.(string)) + } + networkDB.ReceivePeersForNode(node, peerList) + case "broadcast": + streamID := event["streamId"].(string) + fmt.Println("Got a BROADCAST event for", node, streamID) + networkDB.StartBroadcasting(node, streamID) + case "consume": + streamID := event["streamId"].(string) + fmt.Println("Got a CONSUME event for", node, streamID) + networkDB.StartConsuming(node, streamID) + case "relay": + streamID := event["streamId"].(string) + fmt.Println("Got a RELAY event for", node, streamID) + networkDB.StartRelaying(node, streamID) + case "done": + streamID := event["streamId"].(string) + fmt.Println("Got a DONE event for", node, streamID) + networkDB.DoneWithStream(node, streamID) + case "default": + fmt.Fprintf(w, "Error, eventName %v is unknown", eventName) + return + } +} + +func handleMetrics(w http.ResponseWriter, r *http.Request) { + rBody, err := ioutil.ReadAll(r.Body) + if err != nil { + glog.Errorf("Error reading r: %v", err) + } + + node := &data.Node{} + err = json.Unmarshal(rBody, node) + if err != nil { + glog.Errorf("Error unmarshaling: %v", err) + } + glog.Infof("Got Node: %v", string(rBody)) + + network.SetNode(node) +} + +func main() { + // http.HandleFunc("/data.json", handleJson) + http.HandleFunc("/event", handleEvent) + http.HandleFunc("/metrics", handleMetrics) + http.HandleFunc("/", handler) + glog.Infof("Listening on 8081") + http.ListenAndServe(":8081", nil) +} + +func getNetwork() *streamingviz.Network { + sID := "teststream" + network := streamingviz.NewNetwork() + + // Set up peers + network.ReceivePeersForNode("A", []string{"B", "D", "F"}) + network.ReceivePeersForNode("B", []string{"D", "E"}) + network.ReceivePeersForNode("C", []string{"I", "F", "G"}) + network.ReceivePeersForNode("E", []string{"I", "H"}) + network.ReceivePeersForNode("F", []string{"G"}) + network.ReceivePeersForNode("G", []string{"H"}) + network.ReceivePeersForNode("H", []string{"I"}) + + network.StartBroadcasting("A", sID) + network.StartConsuming("I", sID) + network.StartConsuming("G", sID) + network.StartRelaying("F", sID) + network.StartRelaying("C", sID) + network.DoneWithStream("B", sID) + + return network +} + +func networkToData(network *streamingviz.Network, streamID string) interface{} { + /*type Node struct { + ID string + Group int + } + + type Link struct { + Source string + Target string + Value int + }*/ + + res := make(map[string]interface{}) + nodes := make([]map[string]interface{}, 0) + + for _, v := range network.Nodes { + nodes = append(nodes, map[string]interface{}{ + "id": v.ID, + "group": v.GroupForStream(streamID), + }) + } + + links := make([]map[string]interface{}, 0) + + for _, v := range network.Links { + links = append(links, map[string]interface{}{ + "source": v.Source.ID, + "target": v.Target.ID, + "value": 2, //v.Value[streamID], + }) + } + + res["nodes"] = nodes + res["links"] = links + + b, _ := json.Marshal(res) + fmt.Println(fmt.Sprintf("The output network is: %s", b)) + + var genResult interface{} + + json.Unmarshal(b, &genResult) + return genResult +} + +func getData() map[string]*json.RawMessage { + var objmap map[string]*json.RawMessage + abs, _ := filepath.Abs("./server/static/data.json") + f, err := ioutil.ReadFile(abs) + if err != nil { + glog.Errorf("Error reading file: %v", err) + } + if err := json.Unmarshal(f, &objmap); err != nil { + glog.Errorf("Error unmarshaling data: %v", err) + } + return objmap +} diff --git a/vendor/github.com/livepeer/streamingviz/server/static/data.json b/vendor/github.com/livepeer/streamingviz/server/static/data.json new file mode 100755 index 0000000000..9d92643b30 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/server/static/data.json @@ -0,0 +1,32 @@ +{ + "nodes": [ + {"id": "A"}, + {"id": "B"}, + {"id": "C"}, + {"id": "D"}, + {"id": "E"}, + {"id": "F"}, + {"id": "G"}, + {"id": "H"}, + {"id": "I"} + ], + "links": [ + {"source": "A", "target": "B", "value": 1}, + {"source": "A", "target": "D", "value": 1}, + {"source": "A", "target": "F", "value": 1}, + {"source": "B", "target": "D", "value": 1}, + {"source": "B", "target": "E", "value": 1}, + {"source": "C", "target": "I", "value": 1}, + {"source": "C", "target": "F", "value": 1}, + {"source": "C", "target": "G", "value": 1}, + {"source": "E", "target": "I", "value": 1}, + {"source": "E", "target": "H", "value": 1}, + {"source": "F", "target": "G", "value": 1}, + {"source": "G", "target": "H", "value": 1}, + {"source": "H", "target": "I", "value": 1} + ], + "streams": { + "id1": {"broadcaster": "A", "relayers": ["B", "C", "D"], "subscribers": ["C", "E"]}, + "id2": {"broadcaster": "B", "relayers": ["C", "F"], "subscribers": ["E", "H"]} + } +} diff --git a/vendor/github.com/livepeer/streamingviz/server/static/index.html b/vendor/github.com/livepeer/streamingviz/server/static/index.html new file mode 100755 index 0000000000..01f5d62930 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/server/static/index.html @@ -0,0 +1,216 @@ + + + + Livepeer Network Visualization + + + + + + + + +

Livepeer Network Visualization

+

Key

+

+ Green = Publishing nodes
+ Blue = Consuming nodes
+ Orange = Relaying nodes
+ Grey = Inactive nodes on this stream +

+ +
+ + +
+ + + + + + + + + + + diff --git a/vendor/github.com/livepeer/streamingviz/viz.go b/vendor/github.com/livepeer/streamingviz/viz.go new file mode 100644 index 0000000000..b297a86f44 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/viz.go @@ -0,0 +1,198 @@ +package streamingviz + +import ( + "encoding/json" + "fmt" + "sync" +) + +const ( + NodeIdle = iota // 0 + NodeBroadcasting // 1 + NodeConsuming // 2 + NodeRelaying // 3 +) + +const ( + LinkSending = iota + LinkReceiveing +) + +const ( + initialNetworkSize = 50 + initialStreams = 10 +) + +type Node struct { + ID string + Group map[string]int // For a string streamID, what group is it in +} + +func (self *Node) GroupForStream(streamID string) int { + val, ok := self.Group[streamID] + if ok == false { + self.Group[streamID] = NodeIdle + return NodeIdle + } + return val +} + +type Link struct { + Source *Node + Target *Node + Value map[string]int // For a string streamID, what Value does it have +} + +type Network struct { + Nodes map[string]*Node + Links []*Link + StreamIDs []string // All known streams + lock sync.Mutex +} + +func NewNetwork() *Network { + return &Network{ + Nodes: make(map[string]*Node), + Links: make([]*Link, 0), + StreamIDs: make([]string, initialStreams), + } +} + +// Does the node exist in the network already +func (self *Network) hasNode(id string) bool { + _, ok := self.Nodes[id] + return ok +} + +// Does the link exist in the network already +func (self *Network) hasLink(src string, target string) bool { + for _, link := range self.Links { + if (link.Source.ID == src && link.Target.ID == target) || + (link.Target.ID == src && link.Source.ID == target) { + return true + } + } + return false +} + +func (self *Network) addNode(id string) { + self.Nodes[id] = &Node{ + ID: id, + Group: make(map[string]int), + } +} + +func (self *Network) addLink(src string, target string) { + sNode, ok1 := self.findNode(src) + tNode, ok2 := self.findNode(target) + + if ok1 && ok2 { + self.Links = append(self.Links, &Link{ + Source: sNode, + Target: tNode, + Value: make(map[string]int), + }) + } +} + +func (self *Network) findNode(id string) (*Node, bool) { + node, ok := self.Nodes[id] + return node, ok +} + +// Reconstructs the links array removing links for this node +func (self *Network) removeLinksForNode(nodeID string) { + node, ok := self.findNode(nodeID) + if ok { + links := make([]*Link, 0) + for _, link := range self.Links { + if link.Source != node && link.Target != node { + links = append(links, link) + } + } + self.Links = links + } +} + +// Messages that may be received +// 1. Node sends its peers - can construct the peer graph +// 2. Node says it's publishing a stream +// 3. Node says it's requesting a stream +// 4. Node says it's relaying a stream +// 5. Finish publishing, requesting, relaying. TBD whether this is explicit or on a timeout + +// Add this node to the network and add its peers as links. Remove existing links first to keep state consistent +func (self *Network) ReceivePeersForNode(nodeID string, peerIDs []string) { + self.lock.Lock() + defer self.lock.Unlock() + + if !self.hasNode(nodeID) { + self.addNode(nodeID) + fmt.Println("Adding node:", nodeID) + } + + self.removeLinksForNode(nodeID) + + for _, p := range peerIDs { + if !self.hasNode(p) { + self.addNode(p) + fmt.Println("Adding node from peer:", p) + } + + if !self.hasLink(nodeID, p) { + self.addLink(nodeID, p) + fmt.Println("Adding link", nodeID, p) + } + } +} + +func (self *Network) StartBroadcasting(nodeID string, streamID string) { + self.lock.Lock() + defer self.lock.Unlock() + + node, ok := self.findNode(nodeID) + if ok { + node.Group[streamID] = NodeBroadcasting + } +} + +func (self *Network) StartConsuming(nodeID string, streamID string) { + self.lock.Lock() + defer self.lock.Unlock() + + node, ok := self.findNode(nodeID) + if ok { + node.Group[streamID] = NodeConsuming + } +} + +func (self *Network) StartRelaying(nodeID string, streamID string) { + self.lock.Lock() + defer self.lock.Unlock() + + node, ok := self.findNode(nodeID) + if ok { + node.Group[streamID] = NodeRelaying + } +} + +func (self *Network) DoneWithStream(nodeID string, streamID string) { + self.lock.Lock() + defer self.lock.Unlock() + + node, ok := self.findNode(nodeID) + if ok { + node.Group[streamID] = NodeIdle + } +} + +func (self *Network) String() string { + self.lock.Lock() + defer self.lock.Unlock() + + b, err := json.Marshal(self) + if err != nil { + return fmt.Sprintf("Error creating json from this network %v", err) + } + return fmt.Sprintf("%s", b) +} diff --git a/vendor/github.com/livepeer/streamingviz/viz_test.go b/vendor/github.com/livepeer/streamingviz/viz_test.go new file mode 100644 index 0000000000..65c51fcd36 --- /dev/null +++ b/vendor/github.com/livepeer/streamingviz/viz_test.go @@ -0,0 +1,24 @@ +package streamingviz + +import ( + "fmt" + "testing" +) + +func TestNetwork(t *testing.T) { + sID := "teststream" + network := NewNetwork() + + // Set up peers + network.ReceivePeersForNode("A", []string{"B", "D"}) + network.ReceivePeersForNode("B", []string{"A", "D"}) + network.ReceivePeersForNode("C", []string{"D"}) + + network.StartBroadcasting("A", sID) + network.StartConsuming("C", sID) + network.StartRelaying("D", sID) + network.StartConsuming("B", sID) + network.DoneWithStream("B", sID) + + fmt.Println(network.String()) +}