From 1d713fa0143b4609fed6a79f861345f0b2288e32 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Mon, 27 Jun 2022 21:14:29 +0200 Subject: [PATCH 01/10] Finalizer based on LMD votes, not connected to actual balances yet --- .../blocks/standard/lmdfinalizer/finalizer.go | 286 ++++++++++++++ .../standard/lmdfinalizer/finalizer_test.go | 372 ++++++++++++++++++ .../blocks/standard/lmdfinalizer/mock/node.go | 72 ++++ .../blocks/standard/lmdfinalizer/tree/node.go | 71 ++++ .../standard/lmdfinalizer/tree/node_test.go | 84 ++++ .../blocks/standard/lmdfinalizer/tree/tree.go | 138 +++++++ .../lmdfinalizer/tree/tree_internal_test.go | 113 ++++++ .../standard/lmdfinalizer/tree/tree_test.go | 177 +++++++++ services/blocks/standard/lmdfinalizer/vote.go | 53 +++ .../lmdfinalizer/vote_internal_test.go | 87 ++++ 10 files changed, 1453 insertions(+) create mode 100644 services/blocks/standard/lmdfinalizer/finalizer.go create mode 100644 services/blocks/standard/lmdfinalizer/finalizer_test.go create mode 100644 services/blocks/standard/lmdfinalizer/mock/node.go create mode 100644 services/blocks/standard/lmdfinalizer/tree/node.go create mode 100644 services/blocks/standard/lmdfinalizer/tree/node_test.go create mode 100644 services/blocks/standard/lmdfinalizer/tree/tree.go create mode 100644 services/blocks/standard/lmdfinalizer/tree/tree_internal_test.go create mode 100644 services/blocks/standard/lmdfinalizer/tree/tree_test.go create mode 100644 services/blocks/standard/lmdfinalizer/vote.go create mode 100644 services/blocks/standard/lmdfinalizer/vote_internal_test.go diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go new file mode 100644 index 0000000..7e6f5b5 --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -0,0 +1,286 @@ +// Package lmdfinalizer establishes which blocks are finalized from its LDM votes and the LDM votes of its children blocks +// Abbreviation: LFB means Latest Finalized Block +package lmdfinalizer + +import ( + "encoding/hex" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree" + "github.com/wealdtech/chaind/services/chaindb" +) + +// PreBalanceThreshold is a mock threshold used until we connect the finalizer with the validator balances +const PreBalanceThreshold = 10000 + +// LMDFinalizer is a beacon chain finalizer based on LMD votes +type LMDFinalizer interface { + // Start finalizer with latestFinalized block as the latest finalized until now + Start(latestFinalized *chaindb.Block) + // Stop finalizer + Stop() + // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for + // other blocks + AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) + // HandleNewLatestFinalizedBlock receives the event handler that will be trigger everytime a block is finalized + HandleNewLatestFinalizedBlock(newLFBHandler) +} + +// newLFBHandler event handler to be triggered when a new LFB is finalized +type newLFBHandler func(phase0.Root, phase0.Slot) + +// finalizer is the implementation of LMDFinalizer +type finalizer struct { + tree tree.Tree + votes lmdVotes + + started bool + preStartBuffer []*tree.Node + log zerolog.Logger + + onAddNode chan *tree.Node + onStart chan *tree.Node + shutdown chan struct{} + + newLFBHandler newLFBHandler +} + +// New LMDFinalizer with logger `log` +func New(log zerolog.Logger) LMDFinalizer { + f := &finalizer{ + tree: tree.Tree{}, + votes: newLMDVotes(), + + started: false, + preStartBuffer: []*tree.Node{}, + log: log.With().Str("subservice", "LMD finalizer").Logger(), + + onAddNode: make(chan *tree.Node, 1000), // TODO: size of channel + onStart: make(chan *tree.Node), + shutdown: make(chan struct{}), + } + + go f.mainLoop() + + return f +} + +// Start finalizer with latestFinalized block as the latest finalized until now +func (f *finalizer) Start(latestFinalized *chaindb.Block) { + LFB := tree.NewNode(latestFinalized, nil) + + f.onStart <- LFB +} + +// Stop finalizer +func (f *finalizer) Stop() { + close(f.shutdown) +} + +// AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for +// other blocks +func (f *finalizer) AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) { + node := tree.NewNode(dbblock, attestations) + + f.onAddNode <- node +} + +// HandleNewLatestFinalizedBlock receives the event handler that will be trigger everytime a block is finalized +func (f *finalizer) HandleNewLatestFinalizedBlock(handler newLFBHandler) { + // TODO probably not thread safe 100% but perhaps ok as only called once actually + f.newLFBHandler = handler +} + +// mainLoop receives via channels commands and executed them, it is run in its own goroutine so public functions +func (f *finalizer) mainLoop() { + for { + select { + case node := <-f.onAddNode: + err := f.addNode(node) + if err != nil { + f.log.Error().Err(err).Msg("error adding block") + } + case LFB := <-f.onStart: + f.startInternal(LFB) + case <-f.shutdown: + f.log.Info().Msg("stopping LMD finalizer") + return + } + } +} + +// startInternal starts the finalizer +func (f *finalizer) startInternal(LFB *tree.Node) { + if f.started { + f.log.Warn().Msg("tried to start LMD finalizer twice") + return + } + + f.log.Info().Msg("Starting LMD finalizer") + + f.tree = tree.New(LFB) + + f.started = true + + f.preStartUse() + + f.log.Info().Msg("LMD finalizer started") +} + +// addNode +func (f *finalizer) addNode(node *tree.Node) error { + if !f.started { + root := node.Root() + f.log.Trace().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(node.Slot())).Msg("adding block to pre start buffer") + f.preStartInsert(node) + return nil + } + + if f.tree.IsOldBlock(node) { + return errors.New("adding block that is not in the future of latest finalized block") + } + + f.tree.Insert(node) + + // finalizer works even if blocks come out of order + children := f.tree.FindOrphans(node) + f.adopt(node, children) + + f.attestationsToVotes(node) + + newLFB := f.countVotes() + if newLFB != nil { + // we have a new LFB + f.onNewLatestFinalizedBlock(newLFB) + } + + return nil +} + +// attestationsToVotes takes all the attestations included in a node block and convert them to votes and includes them +// to be counted +func (f *finalizer) attestationsToVotes(node *tree.Node) { + for _, attestation := range node.Attestations() { + if f.tree.IsOldSlot(attestation.Slot) { + // we should ignore this attestation, as it refers to a block that is not later than the + // latest LDM finalized block + continue + } + + votes := f.attestationToVotes(attestation) + f.votes.insert(votes) + } + + // memory optimization + node.RemoveAttestations() +} + +// attestationToVotes returns an array of votes from an attestation +func (finalizer) attestationToVotes(attestation *chaindb.Attestation) []lmdVote { + return []lmdVote{ + { + slot: attestation.Slot, + root: attestation.BeaconBlockRoot, + weight: voteWeight(len(attestation.AggregationIndices)), + }, + } +} + +// adopt children nodes by parent node +func (f *finalizer) adopt(parent *tree.Node, children []*tree.Node) { + f.tree.Adopt(parent, children) + votes := []lmdVote{} + for _, child := range children { + votes = append(votes, lmdVote{ + root: parent.Root(), + slot: parent.Slot(), + weight: child.CurrenVote(), + }) + } + f.votes.insert(votes) +} + +// countVotes check votes that have not been counted yet, and count them if their referred block exists in the tree +func (f *finalizer) countVotes() *tree.Node { + var newLFB *tree.Node + + f.votes.tryUncounted(func(vote lmdVote) bool { + if newLFB != nil { + // skip counting this vote as we found a new LFB + return false + } + + block := f.tree.GetByRoot(vote.root) + if block == nil { + // cannot count this vote as we do not have the block it counts into + return false + } + + newLFB = f.countVote(block, vote.weight) + + return true + }) + + return newLFB +} + +// countVote for block and its ancestors +// returns a block if its weight reached threshold, otherwise nil +func (f *finalizer) countVote(block *tree.Node, vote voteWeight) *tree.Node { + var newLFB *tree.Node + + f.tree.Climb(block, func(block *tree.Node) bool { + block.CountVote(vote) + + if f.threshold(block) { + newLFB = block + return false // do not climb anymore, as this is the new latest finalized block + } + return true + }) + + return newLFB +} + +// threshold returns true if a block votes have reach the threshold to become finalized +func (finalizer) threshold(block *tree.Node) bool { + return block.CurrenVote() > PreBalanceThreshold +} + +// onNewLatestFinalizedBlock is called when the finalizer find a new LFB. It handles the transition to a new LFB, +// and calls the event handler +func (f *finalizer) onNewLatestFinalizedBlock(newLFB *tree.Node) { + f.tree.OnNewLatestFinalizedBlock(newLFB) + f.votes.newLatestFinalizedBlockSlot(newLFB.Slot()) + + root := newLFB.Root() + f.log.Info().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(newLFB.Slot())).Msg("new finalized block") + + if f.newLFBHandler != nil { + f.newLFBHandler(newLFB.Root(), newLFB.Slot()) + } +} + +// preStartInsert inserts a node into a buffer until the finalizer is started +func (f *finalizer) preStartInsert(block *tree.Node) { + f.preStartBuffer = append(f.preStartBuffer, block) +} + +// preStartUse uses the buffer of nodes added before the finalizer is started +func (f *finalizer) preStartUse() { + if !f.started { + f.log.Warn().Msg("tried to user pre start buffer on non-started LMD finalizer") + return + } + + preStartBuffer := f.preStartBuffer + f.preStartBuffer = nil + + for _, node := range preStartBuffer { + err := f.addNode(node) + if err != nil { + f.log.Error().Err(err).Msg("error adding from pre start buffer") + } + } +} diff --git a/services/blocks/standard/lmdfinalizer/finalizer_test.go b/services/blocks/standard/lmdfinalizer/finalizer_test.go new file mode 100644 index 0000000..639e8fb --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/finalizer_test.go @@ -0,0 +1,372 @@ +package lmdfinalizer_test + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/rs/zerolog" + zerologger "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" + "runtime" + "sync" + "testing" + "time" +) + +var genesis, _ = mock.MockBlock(0, 0, 0, nil) + +func TestFinalizer_SimpleRun(t *testing.T) { + log := zerologger.With().Logger().Level(zerolog.ErrorLevel) + f := lmdfinalizer.New(log) + + f.Start(genesis) + + count := 0 + var wg sync.WaitGroup + wg.Add(2) + f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ + + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + case 2: + assert.Equal(t, phase0.Slot(100), slot) + assert.EqualValues(t, mock.MockRoot(3), root) + default: + assert.Fail(t, "should there be only 2") + } + }) + + f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis + f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis + f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 + f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none + f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 5: child of 3 + f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none + f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 7: child of 5 + f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 110, + Root: 7, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 8: child of 7 + f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 + + wg.Wait() + + assert.Equal(t, 2, count) +} + +func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { + log := zerologger.With().Logger().Level(zerolog.ErrorLevel) + f := lmdfinalizer.New(log) + + f.Start(genesis) + + count := 0 + var wg sync.WaitGroup + wg.Add(2) + f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ + + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + case 2: + assert.Equal(t, phase0.Slot(100), slot) + assert.EqualValues(t, mock.MockRoot(3), root) + default: + assert.Fail(t, "should there be only 2") + } + }) + + f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 + f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none + // 5: child of 3 + f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none + f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 7: child of 5 + f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) + f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis + f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis + f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 110, + Root: 7, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 8: child of 7 + f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 + + wg.Wait() + + assert.Equal(t, 2, count) +} + +func TestFinalizer_SimpleRunStartAfter(t *testing.T) { + log := zerologger.With().Logger().Level(zerolog.ErrorLevel) + f := lmdfinalizer.New(log) + + count := 0 + var wg sync.WaitGroup + wg.Add(2) + f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ + + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + case 2: + assert.Equal(t, phase0.Slot(100), slot) + assert.EqualValues(t, mock.MockRoot(3), root) + default: + assert.Fail(t, "should there be only 2") + } + }) + + f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis + f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis + f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 + f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none + f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 5: child of 3 + f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none + f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 7: child of 5 + f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 110, + Root: 7, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 8: child of 7 + f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 + + f.Start(genesis) + + wg.Wait() + + assert.Equal(t, 2, count) +} + +func TestFinalizer_SimpleRunStop(t *testing.T) { + log := zerologger.With().Logger().Level(zerolog.ErrorLevel) + f := lmdfinalizer.New(log) + + f.Start(genesis) + + count := 0 + var wg sync.WaitGroup + wg.Add(1) + f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ + + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + default: + assert.Fail(t, "should there be only 1") + } + }) + + f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis + f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis + f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 + f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none + f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 5: child of 3 + f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none + f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 7: child of 5 + + runtime.Gosched() + <-time.After(time.Second) + + f.Stop() + + f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ + { + Slot: 2, + Root: 1, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 100, + Root: 3, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 101, + Root: 5, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + { + Slot: 110, + Root: 7, + NumIndices: lmdfinalizer.PreBalanceThreshold / 4, + }, + })) // 8: child of 7 + f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 + + wg.Wait() + + assert.Equal(t, 1, count) +} diff --git a/services/blocks/standard/lmdfinalizer/mock/node.go b/services/blocks/standard/lmdfinalizer/mock/node.go new file mode 100644 index 0000000..7075597 --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/mock/node.go @@ -0,0 +1,72 @@ +package mock + +import ( + "encoding/binary" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/wealdtech/chaind/services/chaindb" +) + +func MockRoot(value uint16) phase0.Root { + var root phase0.Root + binary.LittleEndian.PutUint16(root[0:2], value) + + return root +} + +type MockAttestation struct { + Slot phase0.Slot + Root uint16 + NumIndices uint +} + +func MockBlock( + slot phase0.Slot, + root uint16, + parentRoot uint16, + mockAtts []MockAttestation, +) (*chaindb.Block, []*chaindb.Attestation) { + block := &chaindb.Block{ + Slot: slot, + Root: MockRoot(root), + ParentRoot: MockRoot(parentRoot), + + // These fields are not use by finalizer + ProposerIndex: 0, + Graffiti: nil, + RANDAOReveal: phase0.BLSSignature{}, + BodyRoot: phase0.Root{}, + StateRoot: phase0.Root{}, + Canonical: nil, + ETH1BlockHash: nil, + ETH1DepositCount: 0, + ETH1DepositRoot: phase0.Root{}, + ExecutionPayload: nil, + } + + attestations := []*chaindb.Attestation{} + for _, mockAtt := range mockAtts { + attestation := &chaindb.Attestation{ + Slot: mockAtt.Slot, + BeaconBlockRoot: MockRoot(mockAtt.Root), + AggregationIndices: make([]phase0.ValidatorIndex, mockAtt.NumIndices), + + // These fields are not use by finalizer + InclusionSlot: 0, + InclusionBlockRoot: phase0.Root{}, + InclusionIndex: 0, + CommitteeIndex: 0, + AggregationBits: nil, + SourceEpoch: 0, + SourceRoot: phase0.Root{}, + TargetEpoch: 0, + TargetRoot: phase0.Root{}, + Canonical: nil, + TargetCorrect: nil, + HeadCorrect: nil, + } + + attestations = append(attestations, attestation) + } + + return block, attestations +} diff --git a/services/blocks/standard/lmdfinalizer/tree/node.go b/services/blocks/standard/lmdfinalizer/tree/node.go new file mode 100644 index 0000000..bb1cc1f --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/tree/node.go @@ -0,0 +1,71 @@ +package tree + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/wealdtech/chaind/services/chaindb" +) + +// VoteWeight contains weighted votes towards a block +type VoteWeight uint64 + +// Node in LMD finalizer Tree +type Node struct { + parent *Node + + root phase0.Root + parentRoot phase0.Root + slot phase0.Slot + + attestations []*chaindb.Attestation + + curVote VoteWeight +} + +// NewNode creates a node from a chaindb block and attestations array +func NewNode(block *chaindb.Block, attestations []*chaindb.Attestation) *Node { + n := &Node{ + slot: block.Slot, + attestations: attestations, // TODO perhaps copy the attestations as they are a pointer + } + + copy(n.root[:], block.Root[:]) + copy(n.parentRoot[:], block.ParentRoot[:]) + + return n +} + +// CountVote into the node +func (n *Node) CountVote(vote VoteWeight) { + n.curVote += vote +} + +// RemoveAttestations clears the memory from attestations, useful if they are not needed anymore +func (n *Node) RemoveAttestations() { + n.attestations = nil +} + +// Root of node block +func (n Node) Root() phase0.Root { + return n.root +} + +// Slot of node block +func (n Node) Slot() phase0.Slot { + return n.slot +} + +// Attestations included in node block +func (n Node) Attestations() []*chaindb.Attestation { + // TODO perhaps copy to do not break encapsulation + return n.attestations +} + +// CurrenVote towards this node block +func (n Node) CurrenVote() VoteWeight { + return n.curVote +} + +// adopt makes node parent of other node +func (n *Node) adopt(orphan *Node) { + orphan.parent = n +} diff --git a/services/blocks/standard/lmdfinalizer/tree/node_test.go b/services/blocks/standard/lmdfinalizer/tree/node_test.go new file mode 100644 index 0000000..f156999 --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/tree/node_test.go @@ -0,0 +1,84 @@ +package tree_test + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/stretchr/testify/assert" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree" + "testing" +) + +func TestNewNode(t *testing.T) { + mBlock, mAtts := mock.MockBlock( + 10, + 10, + 9, + []mock.MockAttestation{ + { + Slot: 0, + Root: 0, + NumIndices: 10, + }, + }, + ) + + node := tree.NewNode(mBlock, mAtts) + + assert.Equal(t, node.CurrenVote(), tree.VoteWeight(0)) + assert.Equal(t, node.Slot(), phase0.Slot(10)) + assert.Equal(t, node.Root()[0], byte(10)) + assert.Len(t, node.Attestations(), 1) + assert.Equal(t, node.Attestations()[0].Slot, phase0.Slot(0)) + assert.Equal(t, node.Attestations()[0].BeaconBlockRoot[0], byte(0)) + assert.Len(t, node.Attestations()[0].AggregationIndices, 10) +} + +func TestNode_RemoveAttestations(t *testing.T) { + mBlock, mAtts := mock.MockBlock( + 10, + 10, + 9, + []mock.MockAttestation{ + { + Slot: 0, + Root: 0, + NumIndices: 10, + }, + }, + ) + + node := tree.NewNode(mBlock, mAtts) + + assert.Len(t, node.Attestations(), 1) + + node.RemoveAttestations() + + assert.Len(t, node.Attestations(), 0) +} + +func TestNode_CountVote(t *testing.T) { + mBlock, mAtts := mock.MockBlock( + 10, + 10, + 9, + []mock.MockAttestation{ + { + Slot: 0, + Root: 0, + NumIndices: 10, + }, + }, + ) + + node := tree.NewNode(mBlock, mAtts) + + assert.Equal(t, node.CurrenVote(), tree.VoteWeight(0)) + + node.CountVote(tree.VoteWeight(111)) + + assert.Equal(t, node.CurrenVote(), tree.VoteWeight(111)) + + node.CountVote(tree.VoteWeight(222)) + + assert.Equal(t, node.CurrenVote(), tree.VoteWeight(333)) +} diff --git a/services/blocks/standard/lmdfinalizer/tree/tree.go b/services/blocks/standard/lmdfinalizer/tree/tree.go new file mode 100644 index 0000000..61a1e2f --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/tree/tree.go @@ -0,0 +1,138 @@ +package tree + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// Tree for LMD finalizer +type Tree struct { + latestFinalizedBlock *Node // root of the tree + rootIndex map[phase0.Root]*Node + orphans map[phase0.Root]*Node +} + +// New creates a tree +func New(latestFinalizedBlock *Node) Tree { + t := Tree{ + latestFinalizedBlock: latestFinalizedBlock, + rootIndex: make(map[phase0.Root]*Node), + orphans: map[phase0.Root]*Node{}, + } + t.rootIndex[latestFinalizedBlock.root] = latestFinalizedBlock + + return t +} + +// Insert new node into the tree +func (t *Tree) Insert(node *Node) { + if t.IsOldBlock(node) { + // do not insert it, as it is not in the future of LFB + // TODO logging? + return + } + + curBlock, already := t.rootIndex[node.root] + if already && curBlock != nil { + // TODO logging + return // already included, be idempotent + } + + t.rootIndex[node.root] = node + + parent := t.rootIndex[node.parentRoot] + if parent != nil { + parent.adopt(node) + } else { + t.insertOrphan(node) + } +} + +// Adopt makes parent the parent of children, and remove the children from the orphan list +func (t *Tree) Adopt(parent *Node, children []*Node) { + for _, child := range children { + if child.parentRoot == parent.root { + parent.adopt(child) + t.removeOrphan(child) + } // TODO else case logging? + } +} + +// GetByRoot returns a node by its block root +func (t *Tree) GetByRoot(root phase0.Root) *Node { + return t.rootIndex[root] +} + +// IsOldSlot returns true if a slot is less or equal than the LFB slot +func (t *Tree) IsOldSlot(slot phase0.Slot) bool { + return slot <= t.latestFinalizedBlock.slot +} + +// IsOldBlock returns true if the node slot is less or equal than the LFB slot +func (t *Tree) IsOldBlock(node *Node) bool { + return t.IsOldSlot(node.slot) +} + +// Climb from a node towards the LFB by the tree, it passes each node to the `callback` +// It does not include the LFB, stopping in the direct child of it +// If the callback returns `false` the tree climbing is stopped at the current node +func (t *Tree) Climb(node *Node, callback func(*Node) bool) { + for { + if node == nil || node.root == t.latestFinalizedBlock.root { + // stop when no parent (orphan) or arrived to the top of the tree + return + } + + if !callback(node) { + // stop if iterator wants to stop + return + } + + node = node.parent + } +} + +// OnNewLatestFinalizedBlock reroots the tree on a new LFB. +// Old nodes are removed as they are not relevant anymore +func (t *Tree) OnNewLatestFinalizedBlock(newLFB *Node) { + t.latestFinalizedBlock = newLFB + + t.removeOldRootIndex() + t.removeOldOrphans() +} + +// FindOrphans which father is a given node +func (t *Tree) FindOrphans(node *Node) []*Node { + children := []*Node{} + + for _, orphan := range t.orphans { + if orphan.parentRoot == node.root { + children = append(children, orphan) + } + } + + return children +} + +func (t *Tree) removeOldRootIndex() { + for root, block := range t.rootIndex { + if t.IsOldBlock(block) && root != t.latestFinalizedBlock.root { + delete(t.rootIndex, root) + } + } +} + +func (t *Tree) removeOldOrphans() { + for _, orphan := range t.orphans { + if t.IsOldBlock(orphan) { + t.removeOrphan(orphan) + } + } +} + +func (t *Tree) insertOrphan(block *Node) { + t.orphans[block.root] = block +} + +func (t *Tree) removeOrphan(orphan *Node) { + delete(t.orphans, orphan.root) +} diff --git a/services/blocks/standard/lmdfinalizer/tree/tree_internal_test.go b/services/blocks/standard/lmdfinalizer/tree/tree_internal_test.go new file mode 100644 index 0000000..7d6e4de --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/tree/tree_internal_test.go @@ -0,0 +1,113 @@ +package tree + +import ( + "github.com/stretchr/testify/assert" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" + "testing" +) + +var genesis = NewNode(mock.MockBlock(0, 0, 0, nil)) + +func TestTree_InsertOutOfOrderAndAdopt(t *testing.T) { + tr := New(genesis) + + node1 := NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := NewNode(mock.MockBlock(2, 2, 0, nil)) + node3 := NewNode(mock.MockBlock(5, 3, 1, nil)) + node4 := NewNode(mock.MockBlock(10, 4, 7, nil)) + + tr.Insert(node3) + tr.Insert(node4) + + tr.Insert(node1) + tr.Insert(node2) + + assert.Len(t, tr.orphans, 2) + assert.Equal(t, tr.orphans[node3.Root()], tr.GetByRoot(node3.Root())) + assert.Equal(t, tr.orphans[node4.Root()], tr.GetByRoot(node4.Root())) + + assert.Equal(t, tr.GetByRoot(genesis.Root()), tr.GetByRoot(node1.Root()).parent) + assert.Equal(t, tr.GetByRoot(genesis.Root()), tr.GetByRoot(node2.Root()).parent) + assert.Equal(t, (*Node)(nil), tr.GetByRoot(node3.Root()).parent) + assert.Equal(t, (*Node)(nil), tr.GetByRoot(node4.Root()).parent) + + tr.Adopt(tr.GetByRoot(node1.Root()), []*Node{tr.GetByRoot(node3.Root()), tr.GetByRoot(node4.Root())}) + + assert.Equal(t, tr.GetByRoot(node1.Root()), tr.GetByRoot(node3.Root()).parent) + assert.Equal(t, (*Node)(nil), tr.GetByRoot(node4.Root()).parent) + + assert.Len(t, tr.orphans, 1) + assert.Equal(t, tr.orphans[node4.Root()], tr.GetByRoot(node4.Root())) +} + +func TestTree_OnNewLatestFinalizedBlock(t *testing.T) { + tr := New(genesis) + + node1 := NewNode(mock.MockBlock(2, 1, 0, nil)) // child of genesis + node2 := NewNode(mock.MockBlock(2, 2, 0, nil)) // child of genesis + node3 := NewNode(mock.MockBlock(100, 3, 1, nil)) // child of 1 + node4 := NewNode(mock.MockBlock(10, 4, 7, nil)) // child of none + node5 := NewNode(mock.MockBlock(101, 5, 3, nil)) // child of 3 + node6 := NewNode(mock.MockBlock(104, 6, 8, nil)) // child of none + + tr.Insert(node1) + tr.Insert(node2) + tr.Insert(node3) + tr.Insert(node4) + tr.Insert(node5) + tr.Insert(node6) + + assert.Equal(t, tr.latestFinalizedBlock, tr.GetByRoot(genesis.Root())) + + assert.Len(t, tr.rootIndex, 7) + assert.Equal(t, tr.rootIndex[genesis.Root()], tr.GetByRoot(genesis.Root())) + assert.Equal(t, tr.rootIndex[node1.Root()], tr.GetByRoot(node1.Root())) + assert.Equal(t, tr.rootIndex[node2.Root()], tr.GetByRoot(node2.Root())) + assert.Equal(t, tr.rootIndex[node3.Root()], tr.GetByRoot(node3.Root())) + assert.Equal(t, tr.rootIndex[node4.Root()], tr.GetByRoot(node4.Root())) + assert.Equal(t, tr.rootIndex[node5.Root()], tr.GetByRoot(node5.Root())) + assert.Equal(t, tr.rootIndex[node6.Root()], tr.GetByRoot(node6.Root())) + + assert.Len(t, tr.orphans, 2) + assert.Equal(t, tr.orphans[node4.Root()], tr.GetByRoot(node4.Root())) + assert.Equal(t, tr.orphans[node6.Root()], tr.GetByRoot(node6.Root())) + + tr.OnNewLatestFinalizedBlock(tr.GetByRoot(node3.Root())) + + assert.Equal(t, tr.latestFinalizedBlock, tr.GetByRoot(node3.Root())) + + assert.Len(t, tr.rootIndex, 3) + assert.Equal(t, tr.rootIndex[node3.Root()], tr.GetByRoot(node3.Root())) + assert.Equal(t, tr.rootIndex[node5.Root()], tr.GetByRoot(node5.Root())) + assert.Equal(t, tr.rootIndex[node6.Root()], tr.GetByRoot(node6.Root())) + + assert.Len(t, tr.orphans, 1) + assert.Equal(t, tr.orphans[node6.Root()], tr.GetByRoot(node6.Root())) +} + +func TestTree_InsertSameRoot(t *testing.T) { + tr := New(genesis) + + node1 := NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := NewNode(mock.MockBlock(2, 1, 0, nil)) // has same root as 1 + node3 := NewNode(mock.MockBlock(100, 3, 1, nil)) + node4 := NewNode(mock.MockBlock(10, 4, 7, nil)) + node5 := NewNode(mock.MockBlock(101, 5, 3, nil)) + node6 := NewNode(mock.MockBlock(104, 6, 8, nil)) + + tr.Insert(node1) + tr.Insert(node2) + tr.Insert(node3) + tr.Insert(node4) + tr.Insert(node5) + tr.Insert(node6) + + assert.Len(t, tr.rootIndex, 6) + assert.Equal(t, tr.rootIndex[genesis.Root()], tr.GetByRoot(genesis.Root())) + assert.Equal(t, tr.rootIndex[node1.Root()], tr.GetByRoot(node1.Root())) + // node2 not present + assert.Equal(t, tr.rootIndex[node3.Root()], tr.GetByRoot(node3.Root())) + assert.Equal(t, tr.rootIndex[node4.Root()], tr.GetByRoot(node4.Root())) + assert.Equal(t, tr.rootIndex[node5.Root()], tr.GetByRoot(node5.Root())) + assert.Equal(t, tr.rootIndex[node6.Root()], tr.GetByRoot(node6.Root())) +} diff --git a/services/blocks/standard/lmdfinalizer/tree/tree_test.go b/services/blocks/standard/lmdfinalizer/tree/tree_test.go new file mode 100644 index 0000000..ec1ca35 --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/tree/tree_test.go @@ -0,0 +1,177 @@ +package tree_test + +import ( + "github.com/stretchr/testify/assert" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree" + "testing" +) + +var genesis = tree.NewNode(mock.MockBlock(0, 0, 0, nil)) + +func TestTree_InsertAndGetByRoot(t *testing.T) { + tr := tree.New(genesis) + + node1 := tree.NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := tree.NewNode(mock.MockBlock(2, 2, 0, nil)) + + tr.Insert(node1) + tr.Insert(node2) + + expectGenesis := tr.GetByRoot(genesis.Root()) + expectNode1 := tr.GetByRoot(node1.Root()) + expectNode2 := tr.GetByRoot(node2.Root()) + + assert.Equal(t, expectGenesis, genesis) + assert.Equal(t, expectNode1, node1) + assert.Equal(t, expectNode2, node2) +} + +func TestTree_InsertSameRoot(t *testing.T) { + tr := tree.New(genesis) + + node1 := tree.NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := tree.NewNode(mock.MockBlock(2, 2, 0, nil)) + + tr.Insert(node1) + tr.Insert(node2) + + expectGenesis := tr.GetByRoot(genesis.Root()) + expectNode1 := tr.GetByRoot(node1.Root()) + expectNode2 := tr.GetByRoot(node2.Root()) + + assert.Equal(t, expectGenesis, genesis) + assert.Equal(t, expectNode1, node1) + assert.Equal(t, expectNode2, node2) +} + +func TestTree_IsOldSlot(t *testing.T) { + node1 := tree.NewNode(mock.MockBlock(100, 2, 0, nil)) + + tr := tree.New(node1) + + assert.True(t, tr.IsOldSlot(2)) + assert.False(t, tr.IsOldSlot(200)) +} + +func TestTree_IsOldBlock(t *testing.T) { + node1 := tree.NewNode(mock.MockBlock(100, 1, 0, nil)) + + tr := tree.New(node1) + + node2 := tree.NewNode(mock.MockBlock(10, 2, 0, nil)) + node3 := tree.NewNode(mock.MockBlock(200, 3, 0, nil)) + + assert.True(t, tr.IsOldBlock(node2)) + assert.False(t, tr.IsOldBlock(node3)) +} + +func TestTree_InsertOld(t *testing.T) { + node1 := tree.NewNode(mock.MockBlock(100, 2, 0, nil)) + + tr := tree.New(node1) + + node2 := tree.NewNode(mock.MockBlock(10, 1, 0, nil)) + + tr.Insert(node2) + + assert.Equal(t, (*tree.Node)(nil), tr.GetByRoot(node2.Root())) +} + +func TestTree_FindOrphans(t *testing.T) { + tr := tree.New(genesis) + + node1 := tree.NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := tree.NewNode(mock.MockBlock(2, 2, 0, nil)) + node3 := tree.NewNode(mock.MockBlock(5, 3, 1, nil)) + node4 := tree.NewNode(mock.MockBlock(10, 4, 7, nil)) + + tr.Insert(node3) + tr.Insert(node4) + + tr.Insert(node1) + tr.Insert(node2) + + orphans := tr.FindOrphans(tr.GetByRoot(node1.Root())) + assert.Len(t, orphans, 1) + assert.Equal(t, orphans[0], tr.GetByRoot(node3.Root())) + + tr.Adopt(tr.GetByRoot(node1.Root()), orphans) + + orphans = tr.FindOrphans(tr.GetByRoot(node1.Root())) + assert.Len(t, orphans, 0) +} + +func TestTree_Climb(t *testing.T) { + tr := tree.New(genesis) + + node1 := tree.NewNode(mock.MockBlock(2, 1, 0, nil)) + node2 := tree.NewNode(mock.MockBlock(2, 2, 0, nil)) + node3 := tree.NewNode(mock.MockBlock(5, 3, 1, nil)) + node4 := tree.NewNode(mock.MockBlock(10, 4, 7, nil)) + node5 := tree.NewNode(mock.MockBlock(11, 5, 3, nil)) + + tr.Insert(node3) + tr.Insert(node4) + tr.Insert(node5) + + tr.Insert(node1) + tr.Insert(node2) + + count := 0 + tr.Climb(tr.GetByRoot(node5.Root()), func(node *tree.Node) bool { + count++ + + switch count { + case 1: + assert.Equal(t, node, tr.GetByRoot(node5.Root())) + case 2: + assert.Equal(t, node, tr.GetByRoot(node3.Root())) + default: + assert.Fail(t, "should only be 2") + } + + return true + }) + assert.Equal(t, count, 2) + + orphans := tr.FindOrphans(tr.GetByRoot(node1.Root())) + tr.Adopt(tr.GetByRoot(node1.Root()), orphans) + + count = 0 + tr.Climb(tr.GetByRoot(node5.Root()), func(node *tree.Node) bool { + count++ + + switch count { + case 1: + assert.Equal(t, node, tr.GetByRoot(node5.Root())) + case 2: + assert.Equal(t, node, tr.GetByRoot(node3.Root())) + case 3: + assert.Equal(t, node, tr.GetByRoot(node1.Root())) + default: + assert.Fail(t, "should only be 3") + } + + return true + }) + assert.Equal(t, count, 3) + + count = 0 + tr.Climb(tr.GetByRoot(node5.Root()), func(node *tree.Node) bool { + count++ + + switch count { + case 1: + assert.Equal(t, node, tr.GetByRoot(node5.Root())) + case 2: + assert.Equal(t, node, tr.GetByRoot(node3.Root())) + return false // do not keep climbing + default: + assert.Fail(t, "should only be 2") + } + + return true + }) + assert.Equal(t, count, 2) +} diff --git a/services/blocks/standard/lmdfinalizer/vote.go b/services/blocks/standard/lmdfinalizer/vote.go new file mode 100644 index 0000000..34a57ac --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/vote.go @@ -0,0 +1,53 @@ +package lmdfinalizer + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree" +) + +type voteWeight = tree.VoteWeight + +type lmdVote struct { + root phase0.Root + slot phase0.Slot + weight voteWeight +} + +type lmdVotes struct { + uncounted []lmdVote +} + +func newLMDVotes() lmdVotes { + return lmdVotes{ + uncounted: []lmdVote{}, + } +} + +func (v *lmdVotes) insert(votes []lmdVote) { + v.uncounted = append(v.uncounted, votes...) +} + +func (v *lmdVotes) tryUncounted(fn func(vote lmdVote) bool) { + newUncounted := []lmdVote{} + + for _, vote := range v.uncounted { + if !fn(vote) { + newUncounted = append(newUncounted, vote) + } + } + + v.uncounted = newUncounted +} + +func (v *lmdVotes) newLatestFinalizedBlockSlot(slot phase0.Slot) { + // TODO perhaps uncounted could be an map of slot to array of votes and this would be more efficient + newUncounted := []lmdVote{} + + for _, vote := range v.uncounted { + if slot < vote.slot { + newUncounted = append(newUncounted, vote) + } + } + + v.uncounted = newUncounted +} diff --git a/services/blocks/standard/lmdfinalizer/vote_internal_test.go b/services/blocks/standard/lmdfinalizer/vote_internal_test.go new file mode 100644 index 0000000..bf35c9c --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/vote_internal_test.go @@ -0,0 +1,87 @@ +package lmdfinalizer + +import ( + "github.com/stretchr/testify/assert" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" + "testing" +) + +func TestLMDVotes_Insert(t *testing.T) { + votes := newLMDVotes() + + vote1 := lmdVote{ + root: mock.MockRoot(100), + slot: 100, + weight: PreBalanceThreshold, + } + vote2 := lmdVote{ + root: mock.MockRoot(200), + slot: 200, + weight: PreBalanceThreshold * 2, + } + + votes.insert([]lmdVote{vote1, vote2}) + + assert.Len(t, votes.uncounted, 2) +} + +func TestLMDVotes_TryUncounted(t *testing.T) { + votes := newLMDVotes() + + vote1 := lmdVote{ + root: mock.MockRoot(100), + slot: 100, + weight: PreBalanceThreshold, + } + vote2 := lmdVote{ + root: mock.MockRoot(200), + slot: 200, + weight: PreBalanceThreshold * 2, + } + + votes.insert([]lmdVote{vote1, vote2}) + + count := 0 + votes.tryUncounted(func(vote lmdVote) bool { + count++ + return false + }) + assert.Equal(t, 2, count) + + count = 0 + votes.tryUncounted(func(vote lmdVote) bool { + count++ + return true + }) + assert.Equal(t, 2, count) + + count = 0 + votes.tryUncounted(func(vote lmdVote) bool { + count++ + return true + }) + assert.Equal(t, 0, count) +} + +func TestLMDVotes_onNewLatestFinalizedBlock(t *testing.T) { + votes := newLMDVotes() + + vote1 := lmdVote{ + root: mock.MockRoot(100), + slot: 100, + weight: PreBalanceThreshold, + } + vote2 := lmdVote{ + root: mock.MockRoot(200), + slot: 200, + weight: PreBalanceThreshold * 2, + } + + votes.insert([]lmdVote{vote1, vote2}) + + assert.Len(t, votes.uncounted, 2) + + votes.newLatestFinalizedBlockSlot(150) + + assert.Len(t, votes.uncounted, 1) +} From accfec2433908242a86c251fb0c384fadd3fa83c Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Mon, 4 Jul 2022 11:38:54 +0200 Subject: [PATCH 02/10] Remove LMDFinalizer.Stop --- .../blocks/standard/lmdfinalizer/finalizer.go | 7 -- .../standard/lmdfinalizer/finalizer_test.go | 93 ------------------- 2 files changed, 100 deletions(-) diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go index 7e6f5b5..6d9c915 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer.go +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -18,8 +18,6 @@ const PreBalanceThreshold = 10000 type LMDFinalizer interface { // Start finalizer with latestFinalized block as the latest finalized until now Start(latestFinalized *chaindb.Block) - // Stop finalizer - Stop() // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for // other blocks AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) @@ -73,11 +71,6 @@ func (f *finalizer) Start(latestFinalized *chaindb.Block) { f.onStart <- LFB } -// Stop finalizer -func (f *finalizer) Stop() { - close(f.shutdown) -} - // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for // other blocks func (f *finalizer) AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) { diff --git a/services/blocks/standard/lmdfinalizer/finalizer_test.go b/services/blocks/standard/lmdfinalizer/finalizer_test.go index 639e8fb..d02656f 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer_test.go +++ b/services/blocks/standard/lmdfinalizer/finalizer_test.go @@ -7,10 +7,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer" "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" - "runtime" "sync" "testing" - "time" ) var genesis, _ = mock.MockBlock(0, 0, 0, nil) @@ -279,94 +277,3 @@ func TestFinalizer_SimpleRunStartAfter(t *testing.T) { assert.Equal(t, 2, count) } - -func TestFinalizer_SimpleRunStop(t *testing.T) { - log := zerologger.With().Logger().Level(zerolog.ErrorLevel) - f := lmdfinalizer.New(log) - - f.Start(genesis) - - count := 0 - var wg sync.WaitGroup - wg.Add(1) - f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { - wg.Done() - count++ - - switch count { - case 1: - assert.Equal(t, phase0.Slot(2), slot) - assert.EqualValues(t, mock.MockRoot(1), root) - default: - assert.Fail(t, "should there be only 1") - } - }) - - f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis - f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis - f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 - f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none - f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 5: child of 3 - f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none - f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 101, - Root: 5, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 7: child of 5 - - runtime.Gosched() - <-time.After(time.Second) - - f.Stop() - - f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 101, - Root: 5, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 110, - Root: 7, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 8: child of 7 - f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 - - wg.Wait() - - assert.Equal(t, 1, count) -} From 944951e810cc776ccb9bcfa14e57fa11b1cf90d3 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Mon, 4 Jul 2022 14:00:18 +0200 Subject: [PATCH 03/10] Remove HandleNewLatestFinalizedBlock from LMD finalizer interface --- .../blocks/standard/lmdfinalizer/finalizer.go | 17 +++----------- .../standard/lmdfinalizer/finalizer_test.go | 23 ++++++++----------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go index 6d9c915..e431eaf 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer.go +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -21,8 +21,6 @@ type LMDFinalizer interface { // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for // other blocks AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) - // HandleNewLatestFinalizedBlock receives the event handler that will be trigger everytime a block is finalized - HandleNewLatestFinalizedBlock(newLFBHandler) } // newLFBHandler event handler to be triggered when a new LFB is finalized @@ -39,13 +37,12 @@ type finalizer struct { onAddNode chan *tree.Node onStart chan *tree.Node - shutdown chan struct{} newLFBHandler newLFBHandler } // New LMDFinalizer with logger `log` -func New(log zerolog.Logger) LMDFinalizer { +func New(log zerolog.Logger, handler newLFBHandler) LMDFinalizer { f := &finalizer{ tree: tree.Tree{}, votes: newLMDVotes(), @@ -56,7 +53,8 @@ func New(log zerolog.Logger) LMDFinalizer { onAddNode: make(chan *tree.Node, 1000), // TODO: size of channel onStart: make(chan *tree.Node), - shutdown: make(chan struct{}), + + newLFBHandler: handler, } go f.mainLoop() @@ -79,12 +77,6 @@ func (f *finalizer) AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Att f.onAddNode <- node } -// HandleNewLatestFinalizedBlock receives the event handler that will be trigger everytime a block is finalized -func (f *finalizer) HandleNewLatestFinalizedBlock(handler newLFBHandler) { - // TODO probably not thread safe 100% but perhaps ok as only called once actually - f.newLFBHandler = handler -} - // mainLoop receives via channels commands and executed them, it is run in its own goroutine so public functions func (f *finalizer) mainLoop() { for { @@ -96,9 +88,6 @@ func (f *finalizer) mainLoop() { } case LFB := <-f.onStart: f.startInternal(LFB) - case <-f.shutdown: - f.log.Info().Msg("stopping LMD finalizer") - return } } } diff --git a/services/blocks/standard/lmdfinalizer/finalizer_test.go b/services/blocks/standard/lmdfinalizer/finalizer_test.go index d02656f..cb91dd8 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer_test.go +++ b/services/blocks/standard/lmdfinalizer/finalizer_test.go @@ -15,14 +15,11 @@ var genesis, _ = mock.MockBlock(0, 0, 0, nil) func TestFinalizer_SimpleRun(t *testing.T) { log := zerologger.With().Logger().Level(zerolog.ErrorLevel) - f := lmdfinalizer.New(log) - - f.Start(genesis) - count := 0 var wg sync.WaitGroup wg.Add(2) - f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + + f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { wg.Done() count++ @@ -38,6 +35,8 @@ func TestFinalizer_SimpleRun(t *testing.T) { } }) + f.Start(genesis) + f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 @@ -103,14 +102,11 @@ func TestFinalizer_SimpleRun(t *testing.T) { func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { log := zerologger.With().Logger().Level(zerolog.ErrorLevel) - f := lmdfinalizer.New(log) - - f.Start(genesis) - count := 0 var wg sync.WaitGroup wg.Add(2) - f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + + f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { wg.Done() count++ @@ -126,6 +122,8 @@ func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { } }) + f.Start(genesis) + f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none // 5: child of 3 @@ -192,12 +190,11 @@ func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { func TestFinalizer_SimpleRunStartAfter(t *testing.T) { log := zerologger.With().Logger().Level(zerolog.ErrorLevel) - f := lmdfinalizer.New(log) - count := 0 var wg sync.WaitGroup wg.Add(2) - f.HandleNewLatestFinalizedBlock(func(root phase0.Root, slot phase0.Slot) { + + f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { wg.Done() count++ From ad9088cd9f5421e9d3e5a947e37c414f1e7f4ce9 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Mon, 4 Jul 2022 14:19:04 +0200 Subject: [PATCH 04/10] Fix periods in comments --- .../blocks/standard/lmdfinalizer/finalizer.go | 135 +++++------------- .../standard/lmdfinalizer/finalizer_test.go | 95 +----------- .../blocks/standard/lmdfinalizer/tree/node.go | 22 +-- .../blocks/standard/lmdfinalizer/tree/tree.go | 32 ++--- services/blocks/standard/lmdfinalizer/vote.go | 2 +- 5 files changed, 65 insertions(+), 221 deletions(-) diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go index e431eaf..4024f4e 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer.go +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -1,5 +1,5 @@ -// Package lmdfinalizer establishes which blocks are finalized from its LDM votes and the LDM votes of its children blocks -// Abbreviation: LFB means Latest Finalized Block +// Package lmdfinalizer establishes which blocks are finalized from its LDM votes and the LDM votes of its children blocks. +// Abbreviation: LFB means Latest Finalized Block. package lmdfinalizer import ( @@ -11,48 +11,42 @@ import ( "github.com/wealdtech/chaind/services/chaindb" ) -// PreBalanceThreshold is a mock threshold used until we connect the finalizer with the validator balances +// PreBalanceThreshold is a mock threshold used until we connect the finalizer with the validator balances. const PreBalanceThreshold = 10000 -// LMDFinalizer is a beacon chain finalizer based on LMD votes +// LMDFinalizer is a beacon chain finalizer based on LMD votes. type LMDFinalizer interface { - // Start finalizer with latestFinalized block as the latest finalized until now - Start(latestFinalized *chaindb.Block) // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for - // other blocks + // other blocks. AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) } -// newLFBHandler event handler to be triggered when a new LFB is finalized +// newLFBHandler event handler to be triggered when a new LFB is finalized. type newLFBHandler func(phase0.Root, phase0.Slot) -// finalizer is the implementation of LMDFinalizer +// finalizer is the implementation of LMDFinalizer. type finalizer struct { tree tree.Tree votes lmdVotes - started bool - preStartBuffer []*tree.Node - log zerolog.Logger + log zerolog.Logger onAddNode chan *tree.Node - onStart chan *tree.Node newLFBHandler newLFBHandler } -// New LMDFinalizer with logger `log` -func New(log zerolog.Logger, handler newLFBHandler) LMDFinalizer { +// New LMDFinalizer with logger `log`. +func New(latestFinalized *chaindb.Block, log zerolog.Logger, handler newLFBHandler) LMDFinalizer { + LFB := tree.NewNode(latestFinalized, nil) + f := &finalizer{ - tree: tree.Tree{}, + tree: tree.New(LFB), votes: newLMDVotes(), - started: false, - preStartBuffer: []*tree.Node{}, - log: log.With().Str("subservice", "LMD finalizer").Logger(), + log: log.With().Str("subservice", "LMD finalizer").Logger(), onAddNode: make(chan *tree.Node, 1000), // TODO: size of channel - onStart: make(chan *tree.Node), newLFBHandler: handler, } @@ -62,70 +56,34 @@ func New(log zerolog.Logger, handler newLFBHandler) LMDFinalizer { return f } -// Start finalizer with latestFinalized block as the latest finalized until now -func (f *finalizer) Start(latestFinalized *chaindb.Block) { - LFB := tree.NewNode(latestFinalized, nil) - - f.onStart <- LFB -} - // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for -// other blocks +// other blocks. func (f *finalizer) AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) { node := tree.NewNode(dbblock, attestations) f.onAddNode <- node } -// mainLoop receives via channels commands and executed them, it is run in its own goroutine so public functions +// mainLoop receives via channels commands and executed them, it is run in its own goroutine so public functions. func (f *finalizer) mainLoop() { for { - select { - case node := <-f.onAddNode: - err := f.addNode(node) - if err != nil { - f.log.Error().Err(err).Msg("error adding block") - } - case LFB := <-f.onStart: - f.startInternal(LFB) + node := <-f.onAddNode + err := f.addNode(node) + if err != nil { + f.log.Error().Err(err).Msg("error adding block") } } } -// startInternal starts the finalizer -func (f *finalizer) startInternal(LFB *tree.Node) { - if f.started { - f.log.Warn().Msg("tried to start LMD finalizer twice") - return - } - - f.log.Info().Msg("Starting LMD finalizer") - - f.tree = tree.New(LFB) - - f.started = true - - f.preStartUse() - - f.log.Info().Msg("LMD finalizer started") -} - -// addNode +// addNode to the finalizer. func (f *finalizer) addNode(node *tree.Node) error { - if !f.started { - root := node.Root() - f.log.Trace().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(node.Slot())).Msg("adding block to pre start buffer") - f.preStartInsert(node) - return nil - } - if f.tree.IsOldBlock(node) { return errors.New("adding block that is not in the future of latest finalized block") } f.tree.Insert(node) - // finalizer works even if blocks come out of order + // Finalizer works even if blocks come out of order. children := f.tree.FindOrphans(node) f.adopt(node, children) @@ -141,12 +99,12 @@ func (f *finalizer) addNode(node *tree.Node) error { } // attestationsToVotes takes all the attestations included in a node block and convert them to votes and includes them -// to be counted +// to be counted. func (f *finalizer) attestationsToVotes(node *tree.Node) { for _, attestation := range node.Attestations() { if f.tree.IsOldSlot(attestation.Slot) { - // we should ignore this attestation, as it refers to a block that is not later than the - // latest LDM finalized block + // We should ignore this attestation, as it refers to a block that is not later than the + // latest LDM finalized block. continue } @@ -158,7 +116,7 @@ func (f *finalizer) attestationsToVotes(node *tree.Node) { node.RemoveAttestations() } -// attestationToVotes returns an array of votes from an attestation +// attestationToVotes returns an array of votes from an attestation. func (finalizer) attestationToVotes(attestation *chaindb.Attestation) []lmdVote { return []lmdVote{ { @@ -169,7 +127,7 @@ func (finalizer) attestationToVotes(attestation *chaindb.Attestation) []lmdVote } } -// adopt children nodes by parent node +// adopt children nodes by parent node. func (f *finalizer) adopt(parent *tree.Node, children []*tree.Node) { f.tree.Adopt(parent, children) votes := []lmdVote{} @@ -183,19 +141,19 @@ func (f *finalizer) adopt(parent *tree.Node, children []*tree.Node) { f.votes.insert(votes) } -// countVotes check votes that have not been counted yet, and count them if their referred block exists in the tree +// countVotes check votes that have not been counted yet, and count them if their referred block exists in the tree. func (f *finalizer) countVotes() *tree.Node { var newLFB *tree.Node f.votes.tryUncounted(func(vote lmdVote) bool { if newLFB != nil { - // skip counting this vote as we found a new LFB + // Skip counting this vote as we found a new LFB. return false } block := f.tree.GetByRoot(vote.root) if block == nil { - // cannot count this vote as we do not have the block it counts into + // Cannot count this vote as we do not have the block it counts into. return false } @@ -207,8 +165,8 @@ func (f *finalizer) countVotes() *tree.Node { return newLFB } -// countVote for block and its ancestors -// returns a block if its weight reached threshold, otherwise nil +// countVote for block and its ancestors. +// Returns a block if its weight reached threshold, otherwise nil. func (f *finalizer) countVote(block *tree.Node, vote voteWeight) *tree.Node { var newLFB *tree.Node @@ -217,7 +175,7 @@ func (f *finalizer) countVote(block *tree.Node, vote voteWeight) *tree.Node { if f.threshold(block) { newLFB = block - return false // do not climb anymore, as this is the new latest finalized block + return false // Do not climb anymore, as this is the new latest finalized block. } return true }) @@ -225,13 +183,13 @@ func (f *finalizer) countVote(block *tree.Node, vote voteWeight) *tree.Node { return newLFB } -// threshold returns true if a block votes have reach the threshold to become finalized +// threshold returns true if a block votes have reach the threshold to become finalized. func (finalizer) threshold(block *tree.Node) bool { return block.CurrenVote() > PreBalanceThreshold } // onNewLatestFinalizedBlock is called when the finalizer find a new LFB. It handles the transition to a new LFB, -// and calls the event handler +// and calls the event handler. func (f *finalizer) onNewLatestFinalizedBlock(newLFB *tree.Node) { f.tree.OnNewLatestFinalizedBlock(newLFB) f.votes.newLatestFinalizedBlockSlot(newLFB.Slot()) @@ -243,26 +201,3 @@ func (f *finalizer) onNewLatestFinalizedBlock(newLFB *tree.Node) { f.newLFBHandler(newLFB.Root(), newLFB.Slot()) } } - -// preStartInsert inserts a node into a buffer until the finalizer is started -func (f *finalizer) preStartInsert(block *tree.Node) { - f.preStartBuffer = append(f.preStartBuffer, block) -} - -// preStartUse uses the buffer of nodes added before the finalizer is started -func (f *finalizer) preStartUse() { - if !f.started { - f.log.Warn().Msg("tried to user pre start buffer on non-started LMD finalizer") - return - } - - preStartBuffer := f.preStartBuffer - f.preStartBuffer = nil - - for _, node := range preStartBuffer { - err := f.addNode(node) - if err != nil { - f.log.Error().Err(err).Msg("error adding from pre start buffer") - } - } -} diff --git a/services/blocks/standard/lmdfinalizer/finalizer_test.go b/services/blocks/standard/lmdfinalizer/finalizer_test.go index cb91dd8..7f58770 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer_test.go +++ b/services/blocks/standard/lmdfinalizer/finalizer_test.go @@ -19,7 +19,7 @@ func TestFinalizer_SimpleRun(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { + f := lmdfinalizer.New(genesis, log, func(root phase0.Root, slot phase0.Slot) { wg.Done() count++ @@ -35,8 +35,6 @@ func TestFinalizer_SimpleRun(t *testing.T) { } }) - f.Start(genesis) - f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 @@ -106,7 +104,7 @@ func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { + f := lmdfinalizer.New(genesis, log, func(root phase0.Root, slot phase0.Slot) { wg.Done() count++ @@ -122,8 +120,6 @@ func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { } }) - f.Start(genesis) - f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none // 5: child of 3 @@ -187,90 +183,3 @@ func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { assert.Equal(t, 2, count) } - -func TestFinalizer_SimpleRunStartAfter(t *testing.T) { - log := zerologger.With().Logger().Level(zerolog.ErrorLevel) - count := 0 - var wg sync.WaitGroup - wg.Add(2) - - f := lmdfinalizer.New(log, func(root phase0.Root, slot phase0.Slot) { - wg.Done() - count++ - - switch count { - case 1: - assert.Equal(t, phase0.Slot(2), slot) - assert.EqualValues(t, mock.MockRoot(1), root) - case 2: - assert.Equal(t, phase0.Slot(100), slot) - assert.EqualValues(t, mock.MockRoot(3), root) - default: - assert.Fail(t, "should there be only 2") - } - }) - - f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis - f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis - f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 - f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none - f.AddBlock(mock.MockBlock(101, 5, 3, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 5: child of 3 - f.AddBlock(mock.MockBlock(104, 6, 1000, nil)) // 6: child of none - f.AddBlock(mock.MockBlock(110, 7, 1000, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 101, - Root: 5, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 7: child of 5 - f.AddBlock(mock.MockBlock(112, 8, 7, []mock.MockAttestation{ - { - Slot: 2, - Root: 1, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 100, - Root: 3, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 101, - Root: 5, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - { - Slot: 110, - Root: 7, - NumIndices: lmdfinalizer.PreBalanceThreshold / 4, - }, - })) // 8: child of 7 - f.AddBlock(mock.MockBlock(113, 9, 8, nil)) // 8: child of 8 - - f.Start(genesis) - - wg.Wait() - - assert.Equal(t, 2, count) -} diff --git a/services/blocks/standard/lmdfinalizer/tree/node.go b/services/blocks/standard/lmdfinalizer/tree/node.go index bb1cc1f..8b159d6 100644 --- a/services/blocks/standard/lmdfinalizer/tree/node.go +++ b/services/blocks/standard/lmdfinalizer/tree/node.go @@ -5,7 +5,7 @@ import ( "github.com/wealdtech/chaind/services/chaindb" ) -// VoteWeight contains weighted votes towards a block +// VoteWeight contains weighted votes towards a block. type VoteWeight uint64 // Node in LMD finalizer Tree @@ -21,11 +21,11 @@ type Node struct { curVote VoteWeight } -// NewNode creates a node from a chaindb block and attestations array +// NewNode creates a node from a chaindb block and attestations array. func NewNode(block *chaindb.Block, attestations []*chaindb.Attestation) *Node { n := &Node{ slot: block.Slot, - attestations: attestations, // TODO perhaps copy the attestations as they are a pointer + attestations: attestations, // TODO perhaps copy the attestations as they are a pointer. } copy(n.root[:], block.Root[:]) @@ -34,38 +34,38 @@ func NewNode(block *chaindb.Block, attestations []*chaindb.Attestation) *Node { return n } -// CountVote into the node +// CountVote into the node. func (n *Node) CountVote(vote VoteWeight) { n.curVote += vote } -// RemoveAttestations clears the memory from attestations, useful if they are not needed anymore +// RemoveAttestations clears the memory from attestations, useful if they are not needed anymore. func (n *Node) RemoveAttestations() { n.attestations = nil } -// Root of node block +// Root of node block. func (n Node) Root() phase0.Root { return n.root } -// Slot of node block +// Slot of node block. func (n Node) Slot() phase0.Slot { return n.slot } -// Attestations included in node block +// Attestations included in node block. func (n Node) Attestations() []*chaindb.Attestation { - // TODO perhaps copy to do not break encapsulation + // TODO perhaps copy to do not break encapsulation. return n.attestations } -// CurrenVote towards this node block +// CurrenVote towards this node block. func (n Node) CurrenVote() VoteWeight { return n.curVote } -// adopt makes node parent of other node +// adopt makes node parent of other node. func (n *Node) adopt(orphan *Node) { orphan.parent = n } diff --git a/services/blocks/standard/lmdfinalizer/tree/tree.go b/services/blocks/standard/lmdfinalizer/tree/tree.go index 61a1e2f..a18da35 100644 --- a/services/blocks/standard/lmdfinalizer/tree/tree.go +++ b/services/blocks/standard/lmdfinalizer/tree/tree.go @@ -4,14 +4,14 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" ) -// Tree for LMD finalizer +// Tree for LMD finalizer. type Tree struct { latestFinalizedBlock *Node // root of the tree rootIndex map[phase0.Root]*Node orphans map[phase0.Root]*Node } -// New creates a tree +// New creates a tree. func New(latestFinalizedBlock *Node) Tree { t := Tree{ latestFinalizedBlock: latestFinalizedBlock, @@ -23,10 +23,10 @@ func New(latestFinalizedBlock *Node) Tree { return t } -// Insert new node into the tree +// Insert new node into the tree. func (t *Tree) Insert(node *Node) { if t.IsOldBlock(node) { - // do not insert it, as it is not in the future of LFB + // Do not insert it, as it is not in the future of LFB. // TODO logging? return } @@ -34,7 +34,7 @@ func (t *Tree) Insert(node *Node) { curBlock, already := t.rootIndex[node.root] if already && curBlock != nil { // TODO logging - return // already included, be idempotent + return // Already included, be idempotent. } t.rootIndex[node.root] = node @@ -47,7 +47,7 @@ func (t *Tree) Insert(node *Node) { } } -// Adopt makes parent the parent of children, and remove the children from the orphan list +// Adopt makes parent the parent of children, and remove the children from the orphan list. func (t *Tree) Adopt(parent *Node, children []*Node) { for _, child := range children { if child.parentRoot == parent.root { @@ -57,33 +57,33 @@ func (t *Tree) Adopt(parent *Node, children []*Node) { } } -// GetByRoot returns a node by its block root +// GetByRoot returns a node by its block root. func (t *Tree) GetByRoot(root phase0.Root) *Node { return t.rootIndex[root] } -// IsOldSlot returns true if a slot is less or equal than the LFB slot +// IsOldSlot returns true if a slot is less or equal than the LFB slot. func (t *Tree) IsOldSlot(slot phase0.Slot) bool { return slot <= t.latestFinalizedBlock.slot } -// IsOldBlock returns true if the node slot is less or equal than the LFB slot +// IsOldBlock returns true if the node slot is less or equal than the LFB slot. func (t *Tree) IsOldBlock(node *Node) bool { return t.IsOldSlot(node.slot) } -// Climb from a node towards the LFB by the tree, it passes each node to the `callback` -// It does not include the LFB, stopping in the direct child of it -// If the callback returns `false` the tree climbing is stopped at the current node +// Climb from a node towards the LFB by the tree, it passes each node to the `callback`. +// It does not include the LFB, stopping in the direct child of it. +// If the callback returns `false` the tree climbing is stopped at the current node. func (t *Tree) Climb(node *Node, callback func(*Node) bool) { for { if node == nil || node.root == t.latestFinalizedBlock.root { - // stop when no parent (orphan) or arrived to the top of the tree + // Stop when no parent (orphan) or arrived to the top of the tree. return } if !callback(node) { - // stop if iterator wants to stop + // Stop if iterator wants to stop. return } @@ -92,7 +92,7 @@ func (t *Tree) Climb(node *Node, callback func(*Node) bool) { } // OnNewLatestFinalizedBlock reroots the tree on a new LFB. -// Old nodes are removed as they are not relevant anymore +// Old nodes are removed as they are not relevant anymore. func (t *Tree) OnNewLatestFinalizedBlock(newLFB *Node) { t.latestFinalizedBlock = newLFB @@ -100,7 +100,7 @@ func (t *Tree) OnNewLatestFinalizedBlock(newLFB *Node) { t.removeOldOrphans() } -// FindOrphans which father is a given node +// FindOrphans which father is a given node. func (t *Tree) FindOrphans(node *Node) []*Node { children := []*Node{} diff --git a/services/blocks/standard/lmdfinalizer/vote.go b/services/blocks/standard/lmdfinalizer/vote.go index 34a57ac..a54bb52 100644 --- a/services/blocks/standard/lmdfinalizer/vote.go +++ b/services/blocks/standard/lmdfinalizer/vote.go @@ -40,7 +40,7 @@ func (v *lmdVotes) tryUncounted(fn func(vote lmdVote) bool) { } func (v *lmdVotes) newLatestFinalizedBlockSlot(slot phase0.Slot) { - // TODO perhaps uncounted could be an map of slot to array of votes and this would be more efficient + // TODO perhaps uncounted could be a map of slot to array of votes and this would be more efficient. newUncounted := []lmdVote{} for _, vote := range v.uncounted { From 009ee38a4b435e85ec45a8c8ae06f95e841b3523 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Thu, 14 Jul 2022 23:20:31 +0200 Subject: [PATCH 05/10] Commented out LMD finalizer integration --- services/blocks/standard/handler.go | 71 +++++++++++++++++++++++++--- services/blocks/standard/metadata.go | 2 + services/blocks/standard/service.go | 56 +++++++++++++++++++++- 3 files changed, 122 insertions(+), 7 deletions(-) diff --git a/services/blocks/standard/handler.go b/services/blocks/standard/handler.go index 50d5a9f..8402c8e 100644 --- a/services/blocks/standard/handler.go +++ b/services/blocks/standard/handler.go @@ -68,6 +68,38 @@ func (s *Service) OnBeaconChainHeadUpdated( monitorBlockProcessed(slot) } +//func (s *Service) onNewLMDFinalizedBlock(root phase0.Root, slot phase0.Slot) { +// ctx, cancel, err := s.chainDB.BeginTx(context.Background()) +// +// md, err := s.getMetadata(ctx) +// if err != nil { +// log.Error().Err(err).Msg("Failed to obtain metadata") +// cancel() +// return +// } +// +// if md.LMDLatestFinalizedSlot >= slot { +// log.Error().Err(err).Msg("trying to set a past LMD finalized block") +// cancel() +// return +// } +// +// md.LMDLatestFinalizedBlockRoot = root +// md.LMDLatestFinalizedSlot = slot +// +// if err := s.setMetadata(ctx, md); err != nil { +// log.Error().Err(err).Msg("Failed to set metadata") +// cancel() +// return +// } +// if err := s.chainDB.CommitTx(ctx); err != nil { +// log.Error().Err(err).Msg("Failed to commit transaction") +// cancel() +// return +// } +// log.Debug().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(slot)).Msg("stored new LMD finalized block") +//} + func (s *Service) updateBlockForSlot(ctx context.Context, slot phase0.Slot) error { log := log.With().Uint64("slot", uint64(slot)).Logger() @@ -103,6 +135,15 @@ func (s *Service) OnBlock(ctx context.Context, signedBlock *spec.VersionedSigned if err := s.blocksSetter.SetBlock(ctx, dbBlock); err != nil { return errors.Wrap(err, "failed to set block") } + //attestations, err := signedBlock.Attestations() + //if err != nil { + // return errors.Wrap(err, "failed to obtain attestations") + //} + //dbAttestations, err := s.dbAttestations(ctx, dbBlock.Slot, dbBlock.Root, attestations) + //if err != nil { + // return errors.Wrap(err, "failed to obtain database attestations") + //} + //s.lmdFinalizer.AddBlock(dbBlock, dbAttestations) switch signedBlock.Version { case spec.DataVersionPhase0: return s.onBlockPhase0(ctx, signedBlock.Phase0, dbBlock) @@ -234,12 +275,11 @@ func (s *Service) updateAttestationsForBlock(ctx context.Context, blockRoot phase0.Root, attestations []*phase0.Attestation, ) error { - beaconCommittees := make(map[phase0.Slot]map[phase0.CommitteeIndex]*chaindb.BeaconCommittee) - for i, attestation := range attestations { - dbAttestation, err := s.dbAttestation(ctx, slot, blockRoot, uint64(i), attestation, beaconCommittees) - if err != nil { - return errors.Wrap(err, "failed to obtain database attestation") - } + dbAttestations, err := s.dbAttestations(ctx, slot, blockRoot, attestations) + if err != nil { + return errors.Wrap(err, "failed to obtain database attestations") + } + for _, dbAttestation := range dbAttestations { if err := s.attestationsSetter.SetAttestation(ctx, dbAttestation); err != nil { return errors.Wrap(err, "failed to set attestation") } @@ -534,6 +574,25 @@ func (s *Service) dbAttestation( return dbAttestation, nil } +func (s *Service) dbAttestations( + ctx context.Context, + inclusionSlot phase0.Slot, + blockRoot phase0.Root, + attestations []*phase0.Attestation, +) ([]*chaindb.Attestation, error) { + beaconCommittees := make(map[phase0.Slot]map[phase0.CommitteeIndex]*chaindb.BeaconCommittee) + result := []*chaindb.Attestation{} + for i, attestation := range attestations { + dbAttestation, err := s.dbAttestation(ctx, inclusionSlot, blockRoot, uint64(i), attestation, beaconCommittees) + if err != nil { + return nil, errors.Wrap(err, "failed to obtain database attestation") + } + + result = append(result, dbAttestation) + } + return result, nil +} + func (s *Service) dbSyncAggregate( ctx context.Context, slot phase0.Slot, diff --git a/services/blocks/standard/metadata.go b/services/blocks/standard/metadata.go index 2294c6b..444b737 100644 --- a/services/blocks/standard/metadata.go +++ b/services/blocks/standard/metadata.go @@ -24,6 +24,8 @@ import ( // metadata stored about this service. type metadata struct { LatestSlot phase0.Slot `json:"latest_slot"` + //LMDLatestFinalizedBlockRoot phase0.Root `json:"lmd_lfb_root,omitempty"` + //LMDLatestFinalizedSlot phase0.Slot `json:"lmd_lfb_slot,omitempty"` } // metadataKey is the key for the metadata. diff --git a/services/blocks/standard/service.go b/services/blocks/standard/service.go index ed19af0..3619b15 100644 --- a/services/blocks/standard/service.go +++ b/services/blocks/standard/service.go @@ -15,7 +15,6 @@ package standard import ( "context" - eth2client "github.com/attestantio/go-eth2-client" api "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -45,6 +44,7 @@ type Service struct { lastHandledBlockRoot phase0.Root activitySem *semaphore.Weighted syncCommittees map[uint64]*chaindb.SyncCommittee + //lmdFinalizer lmdfinalizer.LMDFinalizer } // module-wide log. @@ -125,6 +125,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { refetch: parameters.refetch, activitySem: parameters.activitySem, syncCommittees: make(map[uint64]*chaindb.SyncCommittee), + //lmdFinalizer: nil, } // Note the current highest processed block for the monitor. @@ -156,6 +157,9 @@ func (s *Service) updateAfterRestart(ctx context.Context, startSlot int64) { //nolint:gocritic log.Fatal().Err(err).Msg("Failed to obtain metadata before catchup") } + + //go s.catchupLMDFinalizer(ctx, md) // I need to launch it before md is modified + if startSlot >= 0 { // Explicit requirement to start at a given slot. md.LatestSlot = phase0.Slot(startSlot) @@ -181,6 +185,56 @@ func (s *Service) updateAfterRestart(ctx context.Context, startSlot int64) { } } +//func (s *Service) catchupLMDFinalizer(ctx context.Context, md *metadata) { +// LFBRoot := md.LMDLatestFinalizedBlockRoot +// LFBSlot := md.LMDLatestFinalizedSlot +// +// var LFB *chaindb.Block +// if LFBSlot != 0 { +// block, err := s.chainDB.(chaindb.BlocksProvider).BlockByRoot(ctx, LFBRoot) +// if err != nil { +// log.Error().Err(err).Msg("could not fetch LMD latest finalized block") +// return +// } +// LFB = block +// } else { +// blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, 0) +// if err != nil || len(blocks) == 0 { +// log.Error().Err(err).Msg("could not fetch genesis block") +// return +// } +// if len(blocks) > 1 { +// log.Error().Msg("more than one genesis block") +// return +// } +// LFB = blocks[0] +// } +// +// log.Info().Msg("Starting LMD finalizer") +// s.lmdFinalizer = lmdfinalizer.New(LFB, log, s.onNewLMDFinalizedBlock) +// log.Info().Msg("Started LMD finalizer") +// +// log.Info().Uint64("from_slot", uint64(LFB.Slot)).Uint64("to_slot", uint64(md.LatestSlot)).Msg("Catching up LMD finalizer") +// +// for slot := LFB.Slot + 1; slot <= md.LatestSlot; slot++ { +// blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, slot) +// if err != nil { +// log.Debug().Msg("block not in DB") +// continue +// } +// +// for _, block := range blocks { +// attestations, err := s.chainDB.(chaindb.AttestationsProvider).AttestationsInBlock(ctx, block.Root) +// if err != nil { +// log.Error().Err(err).Msg("error getting attestations from db") +// attestations = nil +// } +// s.lmdFinalizer.AddBlock(block, attestations) +// } +// } +// log.Info().Msg("Caught up LMD finalizer") +//} + func (s *Service) catchup(ctx context.Context, md *metadata) { firstSlot := md.LatestSlot // Increment if not 0 (as we do not differentiate between 0 and unset). From cde3bec067cd6fe39aa55145f7ea0fc256d64462 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Wed, 20 Jul 2022 19:25:28 +0200 Subject: [PATCH 06/10] Configure lmdfinalizer with WithX methods --- .../blocks/standard/lmdfinalizer/finalizer.go | 18 +++-- .../standard/lmdfinalizer/finalizer_test.go | 66 ++++++++++--------- .../standard/lmdfinalizer/parameters.go | 62 +++++++++++++++++ 3 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 services/blocks/standard/lmdfinalizer/parameters.go diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go index 4024f4e..01d0203 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer.go +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -7,6 +7,7 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/pkg/errors" "github.com/rs/zerolog" + zerologger "github.com/rs/zerolog/log" "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree" "github.com/wealdtech/chaind/services/chaindb" ) @@ -36,24 +37,29 @@ type finalizer struct { newLFBHandler newLFBHandler } -// New LMDFinalizer with logger `log`. -func New(latestFinalized *chaindb.Block, log zerolog.Logger, handler newLFBHandler) LMDFinalizer { - LFB := tree.NewNode(latestFinalized, nil) +// New LMDFinalizer. +func New(params ...Parameter) (LMDFinalizer, error) { + parameters, err := parseAndCheckParameters(params...) + if err != nil { + return nil, errors.Wrap(err, "problem with parameters") + } + + LFB := tree.NewNode(parameters.lfb, nil) f := &finalizer{ tree: tree.New(LFB), votes: newLMDVotes(), - log: log.With().Str("subservice", "LMD finalizer").Logger(), + log: zerologger.With().Str("service", "LMD finalizer").Str("impl", "standard").Logger().Level(parameters.logLevel), onAddNode: make(chan *tree.Node, 1000), // TODO: size of channel - newLFBHandler: handler, + newLFBHandler: parameters.newLFBHandler, } go f.mainLoop() - return f + return f, nil } // AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for diff --git a/services/blocks/standard/lmdfinalizer/finalizer_test.go b/services/blocks/standard/lmdfinalizer/finalizer_test.go index 7f58770..7cc6a59 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer_test.go +++ b/services/blocks/standard/lmdfinalizer/finalizer_test.go @@ -2,8 +2,6 @@ package lmdfinalizer_test import ( "github.com/attestantio/go-eth2-client/spec/phase0" - "github.com/rs/zerolog" - zerologger "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer" "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/mock" @@ -14,26 +12,28 @@ import ( var genesis, _ = mock.MockBlock(0, 0, 0, nil) func TestFinalizer_SimpleRun(t *testing.T) { - log := zerologger.With().Logger().Level(zerolog.ErrorLevel) count := 0 var wg sync.WaitGroup wg.Add(2) - f := lmdfinalizer.New(genesis, log, func(root phase0.Root, slot phase0.Slot) { - wg.Done() - count++ + f, err := lmdfinalizer.New( + lmdfinalizer.WithLFB(genesis), + lmdfinalizer.WithHandler(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ - switch count { - case 1: - assert.Equal(t, phase0.Slot(2), slot) - assert.EqualValues(t, mock.MockRoot(1), root) - case 2: - assert.Equal(t, phase0.Slot(100), slot) - assert.EqualValues(t, mock.MockRoot(3), root) - default: - assert.Fail(t, "should there be only 2") - } - }) + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + case 2: + assert.Equal(t, phase0.Slot(100), slot) + assert.EqualValues(t, mock.MockRoot(3), root) + default: + assert.Fail(t, "should there be only 2") + } + })) + assert.NoError(t, err) f.AddBlock(mock.MockBlock(2, 1, 0, nil)) // 1: child of genesis f.AddBlock(mock.MockBlock(2, 2, 0, nil)) // 2: child of genesis @@ -99,26 +99,28 @@ func TestFinalizer_SimpleRun(t *testing.T) { } func TestFinalizer_SimpleRunOutOfOrder(t *testing.T) { - log := zerologger.With().Logger().Level(zerolog.ErrorLevel) count := 0 var wg sync.WaitGroup wg.Add(2) - f := lmdfinalizer.New(genesis, log, func(root phase0.Root, slot phase0.Slot) { - wg.Done() - count++ + f, err := lmdfinalizer.New( + lmdfinalizer.WithLFB(genesis), + lmdfinalizer.WithHandler(func(root phase0.Root, slot phase0.Slot) { + wg.Done() + count++ - switch count { - case 1: - assert.Equal(t, phase0.Slot(2), slot) - assert.EqualValues(t, mock.MockRoot(1), root) - case 2: - assert.Equal(t, phase0.Slot(100), slot) - assert.EqualValues(t, mock.MockRoot(3), root) - default: - assert.Fail(t, "should there be only 2") - } - }) + switch count { + case 1: + assert.Equal(t, phase0.Slot(2), slot) + assert.EqualValues(t, mock.MockRoot(1), root) + case 2: + assert.Equal(t, phase0.Slot(100), slot) + assert.EqualValues(t, mock.MockRoot(3), root) + default: + assert.Fail(t, "should there be only 2") + } + })) + assert.NoError(t, err) f.AddBlock(mock.MockBlock(100, 3, 1, nil)) // 3: child of 1 f.AddBlock(mock.MockBlock(10, 4, 1000, nil)) // 4: child of none diff --git a/services/blocks/standard/lmdfinalizer/parameters.go b/services/blocks/standard/lmdfinalizer/parameters.go new file mode 100644 index 0000000..f18ddba --- /dev/null +++ b/services/blocks/standard/lmdfinalizer/parameters.go @@ -0,0 +1,62 @@ +package lmdfinalizer + +import ( + "errors" + + "github.com/rs/zerolog" + "github.com/wealdtech/chaind/services/chaindb" +) + +type parameters struct { + lfb *chaindb.Block + logLevel zerolog.Level + newLFBHandler newLFBHandler +} + +// Parameter is the interface for service parameters. +type Parameter interface { + apply(*parameters) +} + +type parameterFunc func(*parameters) + +func (f parameterFunc) apply(p *parameters) { + f(p) +} + +func WithLFB(lfb *chaindb.Block) Parameter { + return parameterFunc(func(p *parameters) { + p.lfb = lfb + }) +} + +// WithLogLevel sets the log level for the module. +func WithLogLevel(logLevel zerolog.Level) Parameter { + return parameterFunc(func(p *parameters) { + p.logLevel = logLevel + }) +} + +func WithHandler(handler newLFBHandler) Parameter { + return parameterFunc(func(p *parameters) { + p.newLFBHandler = handler + }) +} + +// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. +func parseAndCheckParameters(params ...Parameter) (*parameters, error) { + parameters := parameters{ + logLevel: zerolog.GlobalLevel(), + } + for _, p := range params { + if params != nil { + p.apply(¶meters) + } + } + + if parameters.lfb == nil { + return nil, errors.New("no latest finalized block specified") + } + + return ¶meters, nil +} From 31b6e0a0166a3dd9138a24bd163567568711408f Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Wed, 20 Jul 2022 19:26:18 +0200 Subject: [PATCH 07/10] Mock integration of LMD finalizer with the blocks service --- services/blocks/standard/service.go | 130 +++++++++++++++++----------- 1 file changed, 78 insertions(+), 52 deletions(-) diff --git a/services/blocks/standard/service.go b/services/blocks/standard/service.go index 3619b15..bfd91a4 100644 --- a/services/blocks/standard/service.go +++ b/services/blocks/standard/service.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" zerologger "github.com/rs/zerolog/log" + "github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer" "github.com/wealdtech/chaind/services/chaindb" "github.com/wealdtech/chaind/services/chaintime" "golang.org/x/sync/semaphore" @@ -44,7 +45,7 @@ type Service struct { lastHandledBlockRoot phase0.Root activitySem *semaphore.Weighted syncCommittees map[uint64]*chaindb.SyncCommittee - //lmdFinalizer lmdfinalizer.LMDFinalizer + lmdFinalizer lmdfinalizer.LMDFinalizer } // module-wide log. @@ -125,7 +126,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { refetch: parameters.refetch, activitySem: parameters.activitySem, syncCommittees: make(map[uint64]*chaindb.SyncCommittee), - //lmdFinalizer: nil, + lmdFinalizer: nil, } // Note the current highest processed block for the monitor. @@ -158,7 +159,7 @@ func (s *Service) updateAfterRestart(ctx context.Context, startSlot int64) { log.Fatal().Err(err).Msg("Failed to obtain metadata before catchup") } - //go s.catchupLMDFinalizer(ctx, md) // I need to launch it before md is modified + go s.catchupLMDFinalizer(ctx, md) // it needs to launch it before md is modified if startSlot >= 0 { // Explicit requirement to start at a given slot. @@ -185,55 +186,80 @@ func (s *Service) updateAfterRestart(ctx context.Context, startSlot int64) { } } -//func (s *Service) catchupLMDFinalizer(ctx context.Context, md *metadata) { -// LFBRoot := md.LMDLatestFinalizedBlockRoot -// LFBSlot := md.LMDLatestFinalizedSlot -// -// var LFB *chaindb.Block -// if LFBSlot != 0 { -// block, err := s.chainDB.(chaindb.BlocksProvider).BlockByRoot(ctx, LFBRoot) -// if err != nil { -// log.Error().Err(err).Msg("could not fetch LMD latest finalized block") -// return -// } -// LFB = block -// } else { -// blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, 0) -// if err != nil || len(blocks) == 0 { -// log.Error().Err(err).Msg("could not fetch genesis block") -// return -// } -// if len(blocks) > 1 { -// log.Error().Msg("more than one genesis block") -// return -// } -// LFB = blocks[0] -// } -// -// log.Info().Msg("Starting LMD finalizer") -// s.lmdFinalizer = lmdfinalizer.New(LFB, log, s.onNewLMDFinalizedBlock) -// log.Info().Msg("Started LMD finalizer") -// -// log.Info().Uint64("from_slot", uint64(LFB.Slot)).Uint64("to_slot", uint64(md.LatestSlot)).Msg("Catching up LMD finalizer") -// -// for slot := LFB.Slot + 1; slot <= md.LatestSlot; slot++ { -// blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, slot) -// if err != nil { -// log.Debug().Msg("block not in DB") -// continue -// } -// -// for _, block := range blocks { -// attestations, err := s.chainDB.(chaindb.AttestationsProvider).AttestationsInBlock(ctx, block.Root) -// if err != nil { -// log.Error().Err(err).Msg("error getting attestations from db") -// attestations = nil -// } -// s.lmdFinalizer.AddBlock(block, attestations) -// } -// } -// log.Info().Msg("Caught up LMD finalizer") -//} +func (s *Service) catchupLMDFinalizer(ctx context.Context, md *metadata) { + //LFBRoot := md.LMDLatestFinalizedBlockRoot + //LFBSlot := md.LMDLatestFinalizedSlot + + var LFB *chaindb.Block + //if LFBSlot != 0 { + // block, err := s.chainDB.(chaindb.BlocksProvider).BlockByRoot(ctx, LFBRoot) + // if err != nil { + // log.Error().Err(err).Msg("could not fetch LMD latest finalized block") + // return + // } + // LFB = block + //} else { + blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, 0) + if err != nil || len(blocks) == 0 { + log.Error().Err(err).Msg("could not fetch genesis block") + return + } + if len(blocks) > 1 { + log.Error().Msg("more than one genesis block") + return + } + LFB = blocks[0] + //} + + log.Info().Msg("Starting LMD finalizer") + s.lmdFinalizer, err = lmdfinalizer.New( + lmdfinalizer.WithLFB(LFB), + lmdfinalizer.WithLogLevel(zerolog.DebugLevel), + ) + if err != nil { + log.Error().Err(err).Msg("could not initializer LMF finalizer") + return + } + log.Info().Msg("Started LMD finalizer") + + log.Info().Uint64("from_slot", uint64(LFB.Slot)).Uint64("to_slot", uint64(md.LatestSlot)).Msg("Catching up LMD finalizer") + + for slot := LFB.Slot + 1; slot <= md.LatestSlot; slot++ { + //ctx, cancel, err := s.chainDB.BeginTx(ctx) + //if err != nil { + // log.Error().Err(err).Msg("Failed to begin transaction") + // return + //} + blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, slot) + if err != nil { + log.Debug().Msg("block not in DB") + // cancel() + continue + } + + for _, block := range blocks { + ctx, cancel, err := s.chainDB.BeginTx(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to begin transaction") + return + } + attestations, err := s.chainDB.(chaindb.AttestationsProvider).AttestationsInBlock(ctx, block.Root) + if err != nil { + log.Error().Err(err).Msg("error getting attestations from db") + cancel() + attestations = nil + } + err = s.chainDB.CommitTx(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to commit transaction") + cancel() + return + } + s.lmdFinalizer.AddBlock(block, attestations) + } + } + log.Info().Msg("Caught up LMD finalizer") +} func (s *Service) catchup(ctx context.Context, md *metadata) { firstSlot := md.LatestSlot From 39a79264a2f6a8bbb7b444c37a5f986366d93996 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Wed, 20 Jul 2022 19:29:19 +0200 Subject: [PATCH 08/10] Make LMD finalizer NewLFBHandler type exported --- services/blocks/standard/lmdfinalizer/finalizer.go | 6 +++--- services/blocks/standard/lmdfinalizer/parameters.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/services/blocks/standard/lmdfinalizer/finalizer.go b/services/blocks/standard/lmdfinalizer/finalizer.go index 01d0203..ca4d10a 100644 --- a/services/blocks/standard/lmdfinalizer/finalizer.go +++ b/services/blocks/standard/lmdfinalizer/finalizer.go @@ -22,8 +22,8 @@ type LMDFinalizer interface { AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) } -// newLFBHandler event handler to be triggered when a new LFB is finalized. -type newLFBHandler func(phase0.Root, phase0.Slot) +// NewLFBHandler event handler to be triggered when a new LFB is finalized. +type NewLFBHandler func(phase0.Root, phase0.Slot) // finalizer is the implementation of LMDFinalizer. type finalizer struct { @@ -34,7 +34,7 @@ type finalizer struct { onAddNode chan *tree.Node - newLFBHandler newLFBHandler + newLFBHandler NewLFBHandler } // New LMDFinalizer. diff --git a/services/blocks/standard/lmdfinalizer/parameters.go b/services/blocks/standard/lmdfinalizer/parameters.go index f18ddba..fb94c56 100644 --- a/services/blocks/standard/lmdfinalizer/parameters.go +++ b/services/blocks/standard/lmdfinalizer/parameters.go @@ -10,7 +10,7 @@ import ( type parameters struct { lfb *chaindb.Block logLevel zerolog.Level - newLFBHandler newLFBHandler + newLFBHandler NewLFBHandler } // Parameter is the interface for service parameters. @@ -37,7 +37,7 @@ func WithLogLevel(logLevel zerolog.Level) Parameter { }) } -func WithHandler(handler newLFBHandler) Parameter { +func WithHandler(handler NewLFBHandler) Parameter { return parameterFunc(func(p *parameters) { p.newLFBHandler = handler }) From 159901bc2a7e472aab2428e1f50dac9ef8c454c1 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Wed, 20 Jul 2022 19:31:10 +0200 Subject: [PATCH 09/10] Send new blocks to LMD finalizer --- services/blocks/standard/handler.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/services/blocks/standard/handler.go b/services/blocks/standard/handler.go index 8402c8e..6e01cb0 100644 --- a/services/blocks/standard/handler.go +++ b/services/blocks/standard/handler.go @@ -135,15 +135,15 @@ func (s *Service) OnBlock(ctx context.Context, signedBlock *spec.VersionedSigned if err := s.blocksSetter.SetBlock(ctx, dbBlock); err != nil { return errors.Wrap(err, "failed to set block") } - //attestations, err := signedBlock.Attestations() - //if err != nil { - // return errors.Wrap(err, "failed to obtain attestations") - //} - //dbAttestations, err := s.dbAttestations(ctx, dbBlock.Slot, dbBlock.Root, attestations) - //if err != nil { - // return errors.Wrap(err, "failed to obtain database attestations") - //} - //s.lmdFinalizer.AddBlock(dbBlock, dbAttestations) + attestations, err := signedBlock.Attestations() + if err != nil { + return errors.Wrap(err, "failed to obtain attestations") + } + dbAttestations, err := s.dbAttestations(ctx, dbBlock.Slot, dbBlock.Root, attestations) + if err != nil { + return errors.Wrap(err, "failed to obtain database attestations") + } + s.lmdFinalizer.AddBlock(dbBlock, dbAttestations) switch signedBlock.Version { case spec.DataVersionPhase0: return s.onBlockPhase0(ctx, signedBlock.Phase0, dbBlock) From 565c519721ebc4218c79cd690e2b54fd6ac291f7 Mon Sep 17 00:00:00 2001 From: Leo Lara Date: Sun, 24 Jul 2022 11:15:12 +0200 Subject: [PATCH 10/10] Keep state of LMD finalizer --- services/blocks/standard/handler.go | 63 ++++++++++++++-------------- services/blocks/standard/metadata.go | 6 +-- services/blocks/standard/service.go | 47 ++++++++++----------- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/services/blocks/standard/handler.go b/services/blocks/standard/handler.go index 6e01cb0..e26c688 100644 --- a/services/blocks/standard/handler.go +++ b/services/blocks/standard/handler.go @@ -16,6 +16,7 @@ package standard import ( "bytes" "context" + "encoding/hex" "fmt" "math/big" @@ -68,37 +69,37 @@ func (s *Service) OnBeaconChainHeadUpdated( monitorBlockProcessed(slot) } -//func (s *Service) onNewLMDFinalizedBlock(root phase0.Root, slot phase0.Slot) { -// ctx, cancel, err := s.chainDB.BeginTx(context.Background()) -// -// md, err := s.getMetadata(ctx) -// if err != nil { -// log.Error().Err(err).Msg("Failed to obtain metadata") -// cancel() -// return -// } -// -// if md.LMDLatestFinalizedSlot >= slot { -// log.Error().Err(err).Msg("trying to set a past LMD finalized block") -// cancel() -// return -// } -// -// md.LMDLatestFinalizedBlockRoot = root -// md.LMDLatestFinalizedSlot = slot -// -// if err := s.setMetadata(ctx, md); err != nil { -// log.Error().Err(err).Msg("Failed to set metadata") -// cancel() -// return -// } -// if err := s.chainDB.CommitTx(ctx); err != nil { -// log.Error().Err(err).Msg("Failed to commit transaction") -// cancel() -// return -// } -// log.Debug().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(slot)).Msg("stored new LMD finalized block") -//} +func (s *Service) onNewLMDFinalizedBlock(root phase0.Root, slot phase0.Slot) { + ctx, cancel, err := s.chainDB.BeginTx(context.Background()) + + md, err := s.getMetadata(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to obtain metadata") + cancel() + return + } + + if md.LMDLatestFinalizedSlot >= slot { + log.Error().Err(err).Msg("trying to set a past LMD finalized block") + cancel() + return + } + + md.LMDLatestFinalizedBlockRoot = root + md.LMDLatestFinalizedSlot = slot + + if err := s.setMetadata(ctx, md); err != nil { + log.Error().Err(err).Msg("Failed to set metadata") + cancel() + return + } + if err := s.chainDB.CommitTx(ctx); err != nil { + log.Error().Err(err).Msg("Failed to commit transaction") + cancel() + return + } + log.Debug().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(slot)).Msg("stored new LMD finalized block") +} func (s *Service) updateBlockForSlot(ctx context.Context, slot phase0.Slot) error { log := log.With().Uint64("slot", uint64(slot)).Logger() diff --git a/services/blocks/standard/metadata.go b/services/blocks/standard/metadata.go index 444b737..32719a1 100644 --- a/services/blocks/standard/metadata.go +++ b/services/blocks/standard/metadata.go @@ -23,9 +23,9 @@ import ( // metadata stored about this service. type metadata struct { - LatestSlot phase0.Slot `json:"latest_slot"` - //LMDLatestFinalizedBlockRoot phase0.Root `json:"lmd_lfb_root,omitempty"` - //LMDLatestFinalizedSlot phase0.Slot `json:"lmd_lfb_slot,omitempty"` + LatestSlot phase0.Slot `json:"latest_slot"` + LMDLatestFinalizedBlockRoot phase0.Root `json:"lmd_lfb_root,omitempty"` + LMDLatestFinalizedSlot phase0.Slot `json:"lmd_lfb_slot,omitempty"` } // metadataKey is the key for the metadata. diff --git a/services/blocks/standard/service.go b/services/blocks/standard/service.go index bfd91a4..bdcc3f0 100644 --- a/services/blocks/standard/service.go +++ b/services/blocks/standard/service.go @@ -187,34 +187,36 @@ func (s *Service) updateAfterRestart(ctx context.Context, startSlot int64) { } func (s *Service) catchupLMDFinalizer(ctx context.Context, md *metadata) { - //LFBRoot := md.LMDLatestFinalizedBlockRoot - //LFBSlot := md.LMDLatestFinalizedSlot + LFBRoot := md.LMDLatestFinalizedBlockRoot + LFBSlot := md.LMDLatestFinalizedSlot var LFB *chaindb.Block - //if LFBSlot != 0 { - // block, err := s.chainDB.(chaindb.BlocksProvider).BlockByRoot(ctx, LFBRoot) - // if err != nil { - // log.Error().Err(err).Msg("could not fetch LMD latest finalized block") - // return - // } - // LFB = block - //} else { - blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, 0) - if err != nil || len(blocks) == 0 { - log.Error().Err(err).Msg("could not fetch genesis block") - return - } - if len(blocks) > 1 { - log.Error().Msg("more than one genesis block") - return + if LFBSlot != 0 { + block, err := s.chainDB.(chaindb.BlocksProvider).BlockByRoot(ctx, LFBRoot) + if err != nil { + log.Error().Err(err).Msg("could not fetch LMD latest finalized block") + return + } + LFB = block + } else { + blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, 0) + if err != nil || len(blocks) == 0 { + log.Error().Err(err).Msg("could not fetch genesis block") + return + } + if len(blocks) > 1 { + log.Error().Msg("more than one genesis block") + return + } + LFB = blocks[0] } - LFB = blocks[0] - //} log.Info().Msg("Starting LMD finalizer") + var err error s.lmdFinalizer, err = lmdfinalizer.New( lmdfinalizer.WithLFB(LFB), lmdfinalizer.WithLogLevel(zerolog.DebugLevel), + lmdfinalizer.WithHandler(s.onNewLMDFinalizedBlock), ) if err != nil { log.Error().Err(err).Msg("could not initializer LMF finalizer") @@ -225,11 +227,6 @@ func (s *Service) catchupLMDFinalizer(ctx context.Context, md *metadata) { log.Info().Uint64("from_slot", uint64(LFB.Slot)).Uint64("to_slot", uint64(md.LatestSlot)).Msg("Catching up LMD finalizer") for slot := LFB.Slot + 1; slot <= md.LatestSlot; slot++ { - //ctx, cancel, err := s.chainDB.BeginTx(ctx) - //if err != nil { - // log.Error().Err(err).Msg("Failed to begin transaction") - // return - //} blocks, err := s.chainDB.(chaindb.BlocksProvider).BlocksBySlot(ctx, slot) if err != nil { log.Debug().Msg("block not in DB")