Skip to content

Commit

Permalink
Merge pull request #50 from livepeer/abs
Browse files Browse the repository at this point in the history
Adaptive Bitrate Streaming
  • Loading branch information
ericxtang authored Aug 9, 2017
2 parents 0c33b55 + 9289f41 commit 5ff5789
Show file tree
Hide file tree
Showing 42 changed files with 2,392 additions and 1,367 deletions.
45 changes: 25 additions & 20 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path"
"path/filepath"
"runtime"
"strings"
"time"

crypto "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
Expand All @@ -32,8 +33,8 @@ import (
bnet "github.com/livepeer/go-livepeer-basicnet"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/mediaserver"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/server"
"github.com/livepeer/go-livepeer/types"
)

Expand Down Expand Up @@ -121,7 +122,7 @@ func main() {
return
}

n, err := core.NewLivepeerNode(nil, nw)
n, err := core.NewLivepeerNode(nil, nw, fmt.Sprintf("%v/.tmp", *datadir))
if err != nil {
glog.Errorf("Error creating livepeer node: %v", err)
}
Expand Down Expand Up @@ -189,10 +190,11 @@ func main() {

//Set up the media server
glog.Infof("\n\nSetting up Media Server")
s := mediaserver.NewLivepeerMediaServer(*rtmpPort, *httpPort, "", n)
s := server.NewLivepeerServer(*rtmpPort, *httpPort, "", n)
ec := make(chan error)
msCtx, cancel := context.WithCancel(context.Background())
go func() {
s.StartWebserver()
ec <- s.StartMediaServer(msCtx)
}()

Expand All @@ -206,12 +208,6 @@ func main() {
cancel()
return
}
// if err := s.StartMediaServer(context.Background()); err != nil {
// glog.Errorf("Failed to start LPMS: %v", err)
// return
// }

// select {}
}

type LPKeyFile struct {
Expand Down Expand Up @@ -355,28 +351,24 @@ func setupTranscoder(n *core.LivepeerNode, acct accounts.Account) (ethereum.Subs
}

//Create Transcode Config
//TODO: profile should contain multiple video profiles. Waiting for a protocol change.
profile, ok := types.VideoProfileLookup[tData]
if !ok {
glog.Errorf("Cannot find video profile for job: %v", tData)
return core.ErrTranscode
}

tProfiles := []types.VideoProfile{profile}
tProfiles := txDataToVideoProfile(tData)
config := net.TranscodeConfig{StrmID: strmId, Profiles: tProfiles, JobID: jid, PerformOnchainClaim: true}
glog.Infof("Transcoder got job %v - strmID: %v, tData: %v, config: %v", tx.Hash(), strmId, tData, config)

//Do The Transcoding
cm := core.NewClaimManager(strmId, jid, tProfiles, n.Eth)
strmIDs, err := n.Transcode(config, cm)
strmIDs, err := n.TranscodeAndBroadcast(config, cm)
if err != nil {
glog.Errorf("Transcode Error: %v", err)
}

//Notify Broadcaster
sid := core.StreamID(strmId)
err = n.NotifyBroadcaster(sid.GetNodeID(), sid, map[core.StreamID]types.VideoProfile{strmIDs[0]: types.VideoProfileLookup[tData]})
if err != nil {
vids := make(map[core.StreamID]types.VideoProfile)
for i, vp := range tProfiles {
vids[strmIDs[i]] = vp
}
if err = n.NotifyBroadcaster(sid.GetNodeID(), sid, vids); err != nil {
glog.Errorf("Notify Broadcaster Error: %v", err)
}

Expand All @@ -388,6 +380,19 @@ func setupTranscoder(n *core.LivepeerNode, acct accounts.Account) (ethereum.Subs
return sub, nil
}

func txDataToVideoProfile(txData string) []types.VideoProfile {
profiles := make([]types.VideoProfile, 0)
for _, txp := range strings.Split(txData, "|") {
p, ok := types.VideoProfileLookup[txp]
if !ok {
glog.Errorf("Cannot find video profile for job: %v", txp)
// return core.ErrTranscode
}
profiles = append(profiles, p)
}
return profiles
}

func stream(port string, streamID string) {
if streamID == "" {
glog.Errorf("Need to specify streamID via -id")
Expand Down
8 changes: 4 additions & 4 deletions core/claimmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestClaimAndVerify(t *testing.T) {
jid := big.NewInt(0)
vidLen := 10
strmID := "strmID"
profiles := []types.VideoProfile{types.P_240P_30FPS_16_9}
profiles := []types.VideoProfile{types.P240p30fps16x9}
s1 := &eth.StubClient{}
cm1 := NewClaimManager(strmID, jid, profiles, s1)

Expand All @@ -26,11 +26,11 @@ func TestClaimAndVerify(t *testing.T) {
dh := common.StringToHash(fmt.Sprintf("dh%v", i))
th := common.StringToHash(fmt.Sprintf("th%v", i))
sig := (&ethTypes.Segment{strmID, big.NewInt(int64(i)), dh}).Hash().Bytes()
cm1.AddClaim(int64(i), dh, th, sig, types.P_240P_30FPS_16_9)
cm1.AddClaim(int64(i), dh, th, sig, types.P240p30fps16x9)
}

VerifyRate = 10 //Set the verify rate so it gets called once
cm1.Claim(types.P_240P_30FPS_16_9)
cm1.Claim(types.P240p30fps16x9)

if s1.ClaimCounter != 1 {
t.Errorf("Claim should be called once, but got %v", s1.ClaimCounter)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestClaimAndVerify(t *testing.T) {
BroadcasterSig: sig,
}
tcHashes = append(tcHashes, claim.Hash())
cm2.AddClaim(seqNo, dh, th, sig, types.P_240P_30FPS_16_9)
cm2.AddClaim(seqNo, dh, th, sig, types.P240p30fps16x9)
}
cm2.Claim(profiles[0])
if s2.ClaimCounter != 2 {
Expand Down
81 changes: 42 additions & 39 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

crypto "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"

"github.com/ericxtang/m3u8"
"github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/golang/glog"
Expand All @@ -24,14 +25,6 @@ import (
"github.com/livepeer/lpms/stream"
)

// Should Create Node
func TestNewLivepeerNode(t *testing.T) {
// n := NewLivepeerNode()
// if n == nil {
// t.Errorf("Cannot set up new livepeer node")
// }
}

type StubVideoNetwork struct {
T *testing.T
broadcasters map[string]*StubBroadcaster
Expand All @@ -40,6 +33,7 @@ type StubVideoNetwork struct {
nodeID string
}

func (n *StubVideoNetwork) String() string { return "" }
func (n *StubVideoNetwork) GetNodeID() string {
return "122011e494a06b20bf7a80f40e80d538675cc0b168c21912d33e0179617d5d4fe4e0"
}
Expand All @@ -60,12 +54,31 @@ func (n *StubVideoNetwork) GetSubscriber(strmID string) (net.Subscriber, error)
}
func (n *StubVideoNetwork) Connect(nodeID, nodeAddr string) error { return nil }
func (n *StubVideoNetwork) SetupProtocol() error { return nil }
func (n *StubVideoNetwork) SendTranscodeResult(nid string, sid string, tr map[string]string) error {
func (n *StubVideoNetwork) SendTranscodeResponse(nid string, sid string, tr map[string]string) error {
n.nodeID = nid
n.strmID = sid
n.tResult = tr
return nil
}
func (n *StubVideoNetwork) ReceivedTranscodeResponse(strmID string, gotResult func(transcodeResult map[string]string)) {
}
func (n *StubVideoNetwork) GetMasterPlaylist(nodeID string, strmID string) (chan *m3u8.MasterPlaylist, error) {
mplc := make(chan *m3u8.MasterPlaylist)
mpl := m3u8.NewMasterPlaylist()
pl, _ := m3u8.NewMediaPlaylist(100, 100)
mpl.Append("stub.m3u8", pl, m3u8.VariantParams{Bandwidth: 100})
// glog.Infof("StubNetwork GetMasterPlaylist. mpl: %v", mpl)

go func() {
mplc <- mpl
close(mplc)
}()

return mplc, nil
}
func (n *StubVideoNetwork) UpdateMasterPlaylist(strmID string, mpl *m3u8.MasterPlaylist) error {
return nil
}

type StubBroadcaster struct {
T *testing.T
Expand All @@ -74,6 +87,8 @@ type StubBroadcaster struct {
Data []byte
}

func (n *StubBroadcaster) IsWorking() bool { return true }
func (n *StubBroadcaster) String() string { return "" }
func (n *StubBroadcaster) Broadcast(seqNo uint64, data []byte) error {
ss, err := BytesToSignedSegment(data)
if err != nil {
Expand All @@ -99,6 +114,8 @@ type StubSubscriber struct {
T *testing.T
}

func (s *StubSubscriber) IsWorking() bool { return true }
func (s *StubSubscriber) String() string { return "" }
func (s *StubSubscriber) Subscribe(ctx context.Context, gotData func(seqNo uint64, data []byte, eof bool)) error {
d, _ := ioutil.ReadFile("./test.ts")
newSeg := SignedSegment{Seg: stream.HLSSegment{SeqNo: 100, Name: "test name", Data: d, Duration: 1}, Sig: []byte("test sig")}
Expand All @@ -107,39 +124,27 @@ func (s *StubSubscriber) Subscribe(ctx context.Context, gotData func(seqNo uint6
s.T.Errorf("Error Converting SignedSegment to Bytes: %v", err)
}

// glog.Infof("Returning seg 100: %v", len(b))
gotData(100, b, false)
return nil
}
func (s *StubSubscriber) Unsubscribe() error { return nil }

func TestTranscode(t *testing.T) {
//Set up the node
// priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
// node, err := net.NewNode(15000, priv, pub)
// if err != nil {
// glog.Errorf("Error creating a new node: %v", err)
// return
// }
// nw, err := net.NewBasicVideoNetwork(node)
// if err != nil {
// glog.Errorf("Cannot create network node: %v", err)
// return
// }

n, _ := NewLivepeerNode(nil, &StubVideoNetwork{T: t})
n, _ := NewLivepeerNode(nil, &StubVideoNetwork{T: t}, ".tmp")

//Call transcode
ids, err := n.Transcode(net.TranscodeConfig{StrmID: "strmID", Profiles: []types.VideoProfile{types.P_144P_30FPS_16_9, types.P_240P_30FPS_16_9}}, nil)

ids, err := n.TranscodeAndBroadcast(net.TranscodeConfig{StrmID: "strmID", Profiles: []types.VideoProfile{types.P144p30fps16x9, types.P240p30fps16x9}}, nil)
if err != nil {
t.Errorf("Error transcoding: %v", err)
}

if !strings.HasSuffix(ids[0].String(), "P144P30FPS169") {
if !strings.HasSuffix(ids[0].String(), "P144p30fps16x9") {
t.Errorf("Bad id0: %v", ids[0])
}

if !strings.HasSuffix(ids[1].String(), "P240P30FPS169") {
if !strings.HasSuffix(ids[1].String(), "P240p30fps16x9") {
t.Errorf("Bad id1: %v", ids[1])
}

Expand Down Expand Up @@ -195,8 +200,6 @@ func TestInterface(t *testing.T) {

func TestSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// intChan := make(chan int)
// go forroutine(ctx, intChan)
boolChan := make(chan bool)
intChan := chanRoutine(ctx, boolChan)
go insertBool(boolChan)
Expand Down Expand Up @@ -265,15 +268,15 @@ func TestCreateTranscodeJob(t *testing.T) {
}

seth := &eth.StubClient{}
n, _ := NewLivepeerNode(seth, nw)
n, _ := NewLivepeerNode(seth, nw, "./tmp")
strmID, _ := MakeStreamID(n.Identity, RandomVideoID(), "")
err = n.CreateTranscodeJob(strmID, []types.VideoProfile{types.P_720P_60FPS_16_9}, 999999999999)
err = n.CreateTranscodeJob(strmID, []types.VideoProfile{types.P720p60fps16x9}, 999999999999)
if err == nil {
t.Errorf("Expecting error since no broadcast stream in streamDB")
}

n.StreamDB.AddNewHLSBuffer(strmID)
err = n.CreateTranscodeJob(strmID, []types.VideoProfile{types.P_720P_60FPS_16_9}, 999999999999)
n.StreamDB.AddNewHLSStream(strmID)
err = n.CreateTranscodeJob(strmID, []types.VideoProfile{types.P720p60fps16x9}, 999999999999)
if err != nil {
t.Errorf("Error creating transcoding job")
}
Expand All @@ -282,8 +285,8 @@ func TestCreateTranscodeJob(t *testing.T) {
t.Errorf("Expecting strmID to be: %v", strmID)
}

if strings.Trim(string(seth.TOpts[:]), "\x00") != types.P_720P_60FPS_16_9.Name {
t.Errorf("Expecting transcode options to be %v, but got %v", types.P_720P_60FPS_16_9.Name, string(seth.TOpts[:]))
if strings.Trim(string(seth.TOpts[:]), "\x00") != types.P720p60fps16x9.Name {
t.Errorf("Expecting transcode options to be %v, but got %v", types.P720p60fps16x9.Name, string(seth.TOpts[:]))
}

if big.NewInt(999999999999).Cmp(seth.MaxPrice) != 0 {
Expand All @@ -304,11 +307,11 @@ func TestNotifyBroadcaster(t *testing.T) {
return
}
seth := &eth.StubClient{}
n, _ := NewLivepeerNode(seth, nw)
n, _ := NewLivepeerNode(seth, nw, "./tmp")
sn := &StubVideoNetwork{}
n.VideoNetwork = sn

err = n.NotifyBroadcaster(n.Identity, "strmid", map[StreamID]types.VideoProfile{"strmid1": types.P_240P_30FPS_16_9})
err = n.NotifyBroadcaster(n.Identity, "strmid", map[StreamID]types.VideoProfile{"strmid1": types.P240p30fps16x9})
if err != nil {
t.Errorf("Error notifying broadcaster: %v", err)
}
Expand All @@ -321,14 +324,14 @@ func TestNotifyBroadcaster(t *testing.T) {
t.Errorf("Expecting strmid, got %v", sn.strmID)
}

if sn.tResult["strmid1"] != types.P_240P_30FPS_16_9.Name {
t.Errorf("Expecting %v, got %v", types.P_240P_30FPS_16_9.Name, sn.tResult["strmid1"])
if sn.tResult["strmid1"] != types.P240p30fps16x9.Name {
t.Errorf("Expecting %v, got %v", types.P240p30fps16x9.Name, sn.tResult["strmid1"])
}
}

func TestCrypto(t *testing.T) {
b := shouldVerifySegment(10, 0, 20, 10, common.BytesToHash(ethCrypto.Keccak256([]byte("abc"))), 1)
fmt.Printf("%x\n\n", b)
fmt.Printf("%v\n\n", b)

blkNumB := make([]byte, 8)
binary.BigEndian.PutUint64(blkNumB, uint64(9994353847340985734))
Expand Down
Loading

0 comments on commit 5ff5789

Please sign in to comment.