Skip to content

Commit

Permalink
Merge pull request #53 from livepeer/metrics
Browse files Browse the repository at this point in the history
Metrics
  • Loading branch information
ericxtang authored Aug 15, 2017
2 parents 5ff5789 + da4b8e5 commit 3820737
Show file tree
Hide file tree
Showing 23 changed files with 1,395 additions and 16 deletions.
23 changes: 20 additions & 3 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
bnet "github.com/livepeer/go-livepeer-basicnet"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/server"
"github.com/livepeer/go-livepeer/types"
Expand Down Expand Up @@ -85,6 +86,8 @@ func main() {
ethPassword := flag.String("ethPassword", "", "New Eth account password")
gethipc := flag.String("gethipc", "", "Geth ipc file location")
protocolAddr := flag.String("protocolAddr", "", "Protocol smart contract address")
monitor := flag.Bool("monitor", false, "Set to true to send performance metrics")
monhost := flag.String("monitorhost", "metrics.livepeer.org", "host name for the metrics data collector")

flag.Parse()

Expand All @@ -111,7 +114,11 @@ func main() {
return
}

node, err := bnet.NewNode(*port, priv, pub)
if *monitor {
lpmon.Endpoint = *monhost
}
notifiee := bnet.NewBasicNotifiee(lpmon.Instance())
node, err := bnet.NewNode(*port, priv, pub, notifiee)
if err != nil {
glog.Errorf("Error creating a new node: %v", err)
return
Expand All @@ -134,6 +141,7 @@ func main() {
glog.Errorf("Cannot set up protocol:%v", err)
return
}
lpmon.Instance().SetBootNode()
} else {
if err := n.Start(*bootID, *bootAddr); err != nil {
glog.Errorf("Cannot connect to bootstrap node: %v", err)
Expand Down Expand Up @@ -394,6 +402,7 @@ func txDataToVideoProfile(txData string) []types.VideoProfile {
}

func stream(port string, streamID string) {
start := time.Now()
if streamID == "" {
glog.Errorf("Need to specify streamID via -id")
return
Expand All @@ -404,13 +413,21 @@ func stream(port string, streamID string) {
glog.Infof("url: %v", url)
err := cmd.Start()
if err != nil {
glog.Infof("Couldn't start the stream")
glog.Infof("Couldn't start the stream. Make sure a local Livepeer node is running on port %v", port)
os.Exit(1)
}
glog.Infof("Now streaming")
err = cmd.Wait()
if err != nil {
glog.Infof("Couldn't start the stream. Make sure a local Livepeer node is running on port %v", port)
os.Exit(1)
}

glog.Infof("Finished the stream")
if time.Since(start) < time.Second {
glog.Infof("Error: Make sure local Livepeer node is running on port %v", port)
} else {
glog.Infof("Finished the stream")
}
return
}

Expand Down
4 changes: 2 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func monitorChan(intChan chan int) {
}
func TestCreateTranscodeJob(t *testing.T) {
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
node, err := bnet.NewNode(15000, priv, pub)
node, err := bnet.NewNode(15000, priv, pub, nil)
if err != nil {
glog.Errorf("Error creating a new node: %v", err)
return
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestCreateTranscodeJob(t *testing.T) {

func TestNotifyBroadcaster(t *testing.T) {
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
node, err := bnet.NewNode(15000, priv, pub)
node, err := bnet.NewNode(15000, priv, pub, nil)
if err != nil {
glog.Errorf("Error creating a new node: %v", err)
return
Expand Down
5 changes: 5 additions & 0 deletions core/streamdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ericxtang/m3u8"
"github.com/golang/glog"

lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/lpms/stream"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ func (s *StreamDB) GetRTMPStream(id StreamID) stream.RTMPVideoStream {
func (s *StreamDB) AddNewHLSStream(strmID StreamID) (strm stream.HLSVideoStream, err error) {
strm = stream.NewBasicHLSVideoStream(strmID.String(), stream.DefaultSegWaitTime)
s.streams[strmID] = strm
lpmon.Instance().LogStream(strmID.String(), 0, 0)

// glog.Infof("Adding new video stream with ID: %v", strmID)
return strm, nil
Expand All @@ -69,6 +71,7 @@ func (s *StreamDB) AddNewHLSStream(strmID StreamID) (strm stream.HLSVideoStream,
func (s *StreamDB) AddNewRTMPStream(strmID StreamID) (strm stream.RTMPVideoStream, err error) {
strm = stream.NewBasicRTMPVideoStream(strmID.String())
s.streams[strmID] = strm
lpmon.Instance().LogStream(strmID.String(), 0, 0)

// glog.Infof("Adding new video stream with ID: %v", strmID)
return strm, nil
Expand All @@ -92,12 +95,14 @@ func (s *StreamDB) AddHLSVariant(hlsStrmID StreamID, varStrmID StreamID, variant

//Add variant streamID to streams map so we can keep track
s.streams[varStrmID] = strm
lpmon.Instance().LogStream(varStrmID.String(), 0, 0)
return nil
}

//AddStream adds an existing video stream.
func (s *StreamDB) AddStream(strmID StreamID, strm stream.VideoStream_) (err error) {
s.streams[strmID] = strm
lpmon.Instance().LogStream(strmID.String(), 0, 0)
return nil
}

Expand Down
101 changes: 101 additions & 0 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package monitor

import (
"context"
"time"

"github.com/golang/glog"
"github.com/livepeer/streamingviz/data"
)

type Monitor struct {
Node *data.Node
}

var monitor *Monitor
var Endpoint string

const PostFreq = time.Second * 30

func Instance() *Monitor {
if monitor == nil {
monitor = newMonitor()
monitor.StartWorker(context.Background())
}

return monitor
}

func newMonitor() *Monitor {
return &Monitor{Node: data.NewNode("")}
}

func (m *Monitor) SetBootNode() {
m.Node.SetBootNode()
}

func (m *Monitor) LogNewConn(local, remote string) {
if m.Node.ID == "" {
m.Node.ID = local
}

m.Node.AddConn(local, remote)
}

func (m *Monitor) RemoveConn(local, remote string) {
m.Node.RemoveConn(local, remote)
}

func (m *Monitor) LogStream(strmID string, size uint, avgChunkSize uint) {
m.Node.SetStream(strmID, size, avgChunkSize)
}

func (m *Monitor) RemoveStream(strmID string) {
m.Node.RemoveStream(strmID)
}

func (m *Monitor) LogBroadcaster(strmID string) {
m.Node.SetBroadcast(strmID)
}

func (m *Monitor) RemoveBroadcast(strmID string) {
m.Node.RemoveBroadcast(strmID)
}

func (m *Monitor) LogRelay(strmID, remote string) {
m.Node.SetRelay(strmID, remote)
}

func (m *Monitor) RemoveRelay(strmID string) {
m.Node.RemoveRelay(strmID)
}

func (m *Monitor) LogSub(strmID string) {
m.Node.SetSub(strmID)
}

func (m *Monitor) RemoveSub(strmID string) {
m.Node.RemoveSub(strmID)
}

func (m *Monitor) LogBuffer(strmID string) {
m.Node.AddBufferEvent(strmID)
}

func (m *Monitor) StartWorker(ctx context.Context) {
ticker := time.NewTicker(PostFreq)
go func() {
for {
select {
case <-ticker.C:
// glog.Infof("Posting node status to monitor")
if m.Node.ID != "" {
m.Node.SubmitToCollector(Endpoint)
}
case <-ctx.Done():
glog.Errorf("Monitor Worker Done")
return
}
}
}()
}
38 changes: 38 additions & 0 deletions monitor/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package monitor

import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/golang/glog"
"github.com/livepeer/streamingviz/data"
)

func TestMonitor(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rBody, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("Error reading r: %v", err)
}

node := &data.Node{}
err = json.Unmarshal(rBody, node)
if err != nil {
t.Errorf("Error unmarshaling: %v", err)
}

glog.Infof("result: %v", node)
}))
defer ts.Close()

m := Instance()
m.LogNewConn("local1", "remote1")
m.LogNewConn("local2", "remote2")
m.RemoveConn("local1", "remote1")
m.LogStream("sid", 10, 10)

m.Node.SubmitToCollector(ts.URL)
}
3 changes: 3 additions & 0 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/types"
lpmscore "github.com/livepeer/lpms/core"
"github.com/livepeer/lpms/segmenter"
Expand Down Expand Up @@ -320,8 +321,10 @@ func getHLSMediaPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Med
return nil, err
}

lpmon.Instance().LogBuffer(strmID.String())
// glog.Infof("Waiting for playlist... err: %v", err)
time.Sleep(2 * time.Second)

continue
} else {
// glog.Infof("Found playlist. Returning")
Expand Down
2 changes: 1 addition & 1 deletion server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var S *LivepeerServer
func setupServer() *LivepeerServer {
if S == nil {
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
node, err := bnet.NewNode(15000, priv, pub)
node, err := bnet.NewNode(15000, priv, pub, nil)
if err != nil {
glog.Errorf("Error creating a new node: %v", err)
return nil
Expand Down
7 changes: 6 additions & 1 deletion test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ go test -logtostderr=true
t2=$?
cd ..

if (($t1!=0||$t2!=0))
cd monitor
go test -logtostderr=true
t3=$?
cd ..

if (($t1!=0||$t2!=0||$t3!=0))
then
printf "\n\nSome Tests Failed\n\n"
exit -1
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3820737

Please sign in to comment.