Skip to content

Commit

Permalink
Merge pull request #544 from livepeer/et/unbonding-fix
Browse files Browse the repository at this point in the history
Exit early when processing historic events
  • Loading branch information
ericxtang authored Aug 29, 2018
2 parents cf67256 + d0df352 commit 90df4ec
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 53 deletions.
32 changes: 6 additions & 26 deletions common/db_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package common

import (
"database/sql"
"fmt"
"math/big"
"testing"
Expand All @@ -12,27 +11,8 @@ import (
_ "github.com/mattn/go-sqlite3"
)

func dbPath(t *testing.T) string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared&_foreign_keys=1", t.Name())
}

func tempDB(t *testing.T) (*DB, *sql.DB, error) {
dbpath := dbPath(t)
dbh, err := InitDB(dbpath)
if err != nil {
t.Error("Unable to initialize DB ", err)
return nil, nil, err
}
raw, err := sql.Open("sqlite3", dbpath)
if err != nil {
t.Error("Unable to open raw sqlite db ", err)
return nil, nil, err
}
return dbh, raw, nil
}

func TestDBLastSeenBlock(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
if err != nil {
return
}
Expand Down Expand Up @@ -89,7 +69,7 @@ func TestDBLastSeenBlock(t *testing.T) {
}

func TestDBVersion(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
if err != nil {
return
}
Expand Down Expand Up @@ -147,7 +127,7 @@ func profilesMatch(j1 []ffmpeg.VideoProfile, j2 []ffmpeg.VideoProfile) bool {
}

func TestDBJobs(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
defer dbh.Close()
defer dbraw.Close()
j := NewStubJob()
Expand Down Expand Up @@ -180,7 +160,7 @@ func TestDBJobs(t *testing.T) {
}

func TestDBReceipts(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
defer dbh.Close()
defer dbraw.Close()
jid := big.NewInt(0)
Expand Down Expand Up @@ -270,7 +250,7 @@ func TestDBReceipts(t *testing.T) {
}

func TestDBClaims(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
defer dbh.Close()
defer dbraw.Close()

Expand Down Expand Up @@ -381,7 +361,7 @@ func TestDBClaims(t *testing.T) {
}

func TestDBUnbondingLocks(t *testing.T) {
dbh, dbraw, err := tempDB(t)
dbh, dbraw, err := TempDB(t)
defer dbh.Close()
defer dbraw.Close()
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions common/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package common

import (
"database/sql"
"fmt"
"testing"
)

func dbPath(t *testing.T) string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared&_foreign_keys=1", t.Name())
}

func TempDB(t *testing.T) (*DB, *sql.DB, error) {
dbpath := dbPath(t)
dbh, err := InitDB(dbpath)
if err != nil {
t.Error("Unable to initialize DB ", err)
return nil, nil, err
}
raw, err := sql.Open("sqlite3", dbpath)
if err != nil {
t.Error("Unable to open raw sqlite db ", err)
return nil, nil, err
}
return dbh, raw, nil
}
5 changes: 5 additions & 0 deletions eth/eventservices/unbondingservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (s *UnbondingService) processHistoricalEvents() error {
return err
}

//Exit early if LastSeenBlock is zero (starting with a new db)
if startBlock.Cmp(big.NewInt(0)) == 0 {
return nil
}

if err := s.client.ProcessHistoricalUnbond(startBlock, func(newUnbond *contracts.BondingManagerUnbond) error {
// Insert new unbonding lock into database
if err := s.db.InsertUnbondingLock(newUnbond.UnbondingLockId, newUnbond.Delegator, newUnbond.Amount, newUnbond.WithdrawRound); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions eth/eventservices/unbondingservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package eventservices

import (
"errors"
"math/big"
"testing"

"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
)

func TestProcessHistoricalEvents(t *testing.T) {
dbh, dbraw, err := common.TempDB(t)
if err != nil {
return
}
defer dbh.Close()
defer dbraw.Close()

c := &eth.StubClient{}
//Set the eth client to be nil
s := NewUnbondingService(c, dbh)

//Test that we exit early because LastSeenBlock is 0 in this case
dbh.SetLastSeenBlock(big.NewInt(0))
if err := s.processHistoricalEvents(); err != nil {
t.Errorf("Error: %v", err)
}

//Set last seen block to be 10, this should result in an error because we don't exit early anymore and we set a stubbed error
dbh.SetLastSeenBlock(big.NewInt(10))
c.ProcessHistoricalUnbondError = errors.New("StubError")
if err := s.processHistoricalEvents(); err.Error() != "StubError" {
t.Errorf("Expecting stub error, but got none")
}
}
55 changes: 28 additions & 27 deletions eth/stubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,33 @@ import (
)

type StubClient struct {
StrmID string
TOpts string
MaxPrice *big.Int
Jid *big.Int
SegSeqNum *big.Int
VeriRate uint64
DStorageHash string
DHash [32]byte
TDHash [32]byte
BSig []byte
Proof []byte
VerifyCounter int
ClaimJid []*big.Int
ClaimStart []*big.Int
ClaimEnd []*big.Int
ClaimRoot map[[32]byte]bool
ClaimCounter int
SubLogsCh chan types.Log
JobsMap map[string]*lpTypes.Job
TranscoderAddress common.Address
BlockNum *big.Int
BlockHashToReturn common.Hash
Claims map[int]*lpTypes.Claim
LatestBlockError error
JobError error
WatchJobError error
StrmID string
TOpts string
MaxPrice *big.Int
Jid *big.Int
SegSeqNum *big.Int
VeriRate uint64
DStorageHash string
DHash [32]byte
TDHash [32]byte
BSig []byte
Proof []byte
VerifyCounter int
ClaimJid []*big.Int
ClaimStart []*big.Int
ClaimEnd []*big.Int
ClaimRoot map[[32]byte]bool
ClaimCounter int
SubLogsCh chan types.Log
JobsMap map[string]*lpTypes.Job
TranscoderAddress common.Address
BlockNum *big.Int
BlockHashToReturn common.Hash
Claims map[int]*lpTypes.Claim
LatestBlockError error
JobError error
WatchJobError error
ProcessHistoricalUnbondError error
}

func (e *StubClient) Setup(password string, gasLimit uint64, gasPrice *big.Int) error { return nil }
Expand Down Expand Up @@ -190,7 +191,7 @@ func (c *StubClient) WatchForNewJob(bool, chan *contracts.JobsManagerNewJob) (et
return nil, nil
}
func (c *StubClient) ProcessHistoricalUnbond(*big.Int, func(*contracts.BondingManagerUnbond) error) error {
return nil
return c.ProcessHistoricalUnbondError
}
func (c *StubClient) WatchForUnbond(chan *contracts.BondingManagerUnbond) (ethereum.Subscription, error) {
return nil, nil
Expand Down

0 comments on commit 90df4ec

Please sign in to comment.