Skip to content

Finalizer based on LMD votes, not connected to actual balances yet #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
72 changes: 66 additions & 6 deletions services/blocks/standard/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package standard
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math/big"

Expand Down Expand Up @@ -68,6 +69,38 @@ func (s *Service) OnBeaconChainHeadUpdated(
monitorBlockProcessed(slot)
}

func (s *Service) onNewLMDFinalizedBlock(root phase0.Root, slot phase0.Slot) {
ctx, cancel, err := s.chainDB.BeginTx(context.Background())

md, err := s.getMetadata(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to obtain metadata")
cancel()
return
}

if md.LMDLatestFinalizedSlot >= slot {
log.Error().Err(err).Msg("trying to set a past LMD finalized block")
cancel()
return
}

md.LMDLatestFinalizedBlockRoot = root
md.LMDLatestFinalizedSlot = slot

if err := s.setMetadata(ctx, md); err != nil {
log.Error().Err(err).Msg("Failed to set metadata")
cancel()
return
}
if err := s.chainDB.CommitTx(ctx); err != nil {
log.Error().Err(err).Msg("Failed to commit transaction")
cancel()
return
}
log.Debug().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(slot)).Msg("stored new LMD finalized block")
}

func (s *Service) updateBlockForSlot(ctx context.Context, slot phase0.Slot) error {
log := log.With().Uint64("slot", uint64(slot)).Logger()

Expand Down Expand Up @@ -103,6 +136,15 @@ func (s *Service) OnBlock(ctx context.Context, signedBlock *spec.VersionedSigned
if err := s.blocksSetter.SetBlock(ctx, dbBlock); err != nil {
return errors.Wrap(err, "failed to set block")
}
attestations, err := signedBlock.Attestations()
if err != nil {
return errors.Wrap(err, "failed to obtain attestations")
}
dbAttestations, err := s.dbAttestations(ctx, dbBlock.Slot, dbBlock.Root, attestations)
if err != nil {
return errors.Wrap(err, "failed to obtain database attestations")
}
s.lmdFinalizer.AddBlock(dbBlock, dbAttestations)
switch signedBlock.Version {
case spec.DataVersionPhase0:
return s.onBlockPhase0(ctx, signedBlock.Phase0, dbBlock)
Expand Down Expand Up @@ -234,12 +276,11 @@ func (s *Service) updateAttestationsForBlock(ctx context.Context,
blockRoot phase0.Root,
attestations []*phase0.Attestation,
) error {
beaconCommittees := make(map[phase0.Slot]map[phase0.CommitteeIndex]*chaindb.BeaconCommittee)
for i, attestation := range attestations {
dbAttestation, err := s.dbAttestation(ctx, slot, blockRoot, uint64(i), attestation, beaconCommittees)
if err != nil {
return errors.Wrap(err, "failed to obtain database attestation")
}
dbAttestations, err := s.dbAttestations(ctx, slot, blockRoot, attestations)
if err != nil {
return errors.Wrap(err, "failed to obtain database attestations")
}
for _, dbAttestation := range dbAttestations {
if err := s.attestationsSetter.SetAttestation(ctx, dbAttestation); err != nil {
return errors.Wrap(err, "failed to set attestation")
}
Expand Down Expand Up @@ -534,6 +575,25 @@ func (s *Service) dbAttestation(
return dbAttestation, nil
}

func (s *Service) dbAttestations(
ctx context.Context,
inclusionSlot phase0.Slot,
blockRoot phase0.Root,
attestations []*phase0.Attestation,
) ([]*chaindb.Attestation, error) {
beaconCommittees := make(map[phase0.Slot]map[phase0.CommitteeIndex]*chaindb.BeaconCommittee)
result := []*chaindb.Attestation{}
for i, attestation := range attestations {
dbAttestation, err := s.dbAttestation(ctx, inclusionSlot, blockRoot, uint64(i), attestation, beaconCommittees)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain database attestation")
}

result = append(result, dbAttestation)
}
return result, nil
}

func (s *Service) dbSyncAggregate(
ctx context.Context,
slot phase0.Slot,
Expand Down
209 changes: 209 additions & 0 deletions services/blocks/standard/lmdfinalizer/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Package lmdfinalizer establishes which blocks are finalized from its LDM votes and the LDM votes of its children blocks.
// Abbreviation: LFB means Latest Finalized Block.
package lmdfinalizer

import (
"encoding/hex"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
"github.com/rs/zerolog"
zerologger "github.com/rs/zerolog/log"
"github.com/wealdtech/chaind/services/blocks/standard/lmdfinalizer/tree"
"github.com/wealdtech/chaind/services/chaindb"
)

// PreBalanceThreshold is a mock threshold used until we connect the finalizer with the validator balances.
const PreBalanceThreshold = 10000

// LMDFinalizer is a beacon chain finalizer based on LMD votes.
type LMDFinalizer interface {
// AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for
// other blocks.
AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation)
}

// NewLFBHandler event handler to be triggered when a new LFB is finalized.
type NewLFBHandler func(phase0.Root, phase0.Slot)

// finalizer is the implementation of LMDFinalizer.
type finalizer struct {
tree tree.Tree
votes lmdVotes

log zerolog.Logger

onAddNode chan *tree.Node

newLFBHandler NewLFBHandler
}

// New LMDFinalizer.
func New(params ...Parameter) (LMDFinalizer, error) {
parameters, err := parseAndCheckParameters(params...)
if err != nil {
return nil, errors.Wrap(err, "problem with parameters")
}

LFB := tree.NewNode(parameters.lfb, nil)

f := &finalizer{
tree: tree.New(LFB),
votes: newLMDVotes(),

log: zerologger.With().Str("service", "LMD finalizer").Str("impl", "standard").Logger().Level(parameters.logLevel),

onAddNode: make(chan *tree.Node, 1000), // TODO: size of channel

newLFBHandler: parameters.newLFBHandler,
}

go f.mainLoop()

return f, nil
}

// AddBlock to finalizer to be candidate for finalization and use its included attestations as votes for
// other blocks.
func (f *finalizer) AddBlock(dbblock *chaindb.Block, attestations []*chaindb.Attestation) {
node := tree.NewNode(dbblock, attestations)

f.onAddNode <- node
}

// mainLoop receives via channels commands and executed them, it is run in its own goroutine so public functions.
func (f *finalizer) mainLoop() {
for {
node := <-f.onAddNode
err := f.addNode(node)
if err != nil {
f.log.Error().Err(err).Msg("error adding block")
}
}
}

// addNode to the finalizer.
func (f *finalizer) addNode(node *tree.Node) error {
if f.tree.IsOldBlock(node) {
return errors.New("adding block that is not in the future of latest finalized block")
}

f.tree.Insert(node)

// Finalizer works even if blocks come out of order.
children := f.tree.FindOrphans(node)
f.adopt(node, children)

f.attestationsToVotes(node)

newLFB := f.countVotes()
if newLFB != nil {
// we have a new LFB
f.onNewLatestFinalizedBlock(newLFB)
}

return nil
}

// attestationsToVotes takes all the attestations included in a node block and convert them to votes and includes them
// to be counted.
func (f *finalizer) attestationsToVotes(node *tree.Node) {
for _, attestation := range node.Attestations() {
if f.tree.IsOldSlot(attestation.Slot) {
// We should ignore this attestation, as it refers to a block that is not later than the
// latest LDM finalized block.
continue
}

votes := f.attestationToVotes(attestation)
f.votes.insert(votes)
}

// memory optimization
node.RemoveAttestations()
}

// attestationToVotes returns an array of votes from an attestation.
func (finalizer) attestationToVotes(attestation *chaindb.Attestation) []lmdVote {
return []lmdVote{
{
slot: attestation.Slot,
root: attestation.BeaconBlockRoot,
weight: voteWeight(len(attestation.AggregationIndices)),
},
}
}

// adopt children nodes by parent node.
func (f *finalizer) adopt(parent *tree.Node, children []*tree.Node) {
f.tree.Adopt(parent, children)
votes := []lmdVote{}
for _, child := range children {
votes = append(votes, lmdVote{
root: parent.Root(),
slot: parent.Slot(),
weight: child.CurrenVote(),
})
}
f.votes.insert(votes)
}

// countVotes check votes that have not been counted yet, and count them if their referred block exists in the tree.
func (f *finalizer) countVotes() *tree.Node {
var newLFB *tree.Node

f.votes.tryUncounted(func(vote lmdVote) bool {
if newLFB != nil {
// Skip counting this vote as we found a new LFB.
return false
}

block := f.tree.GetByRoot(vote.root)
if block == nil {
// Cannot count this vote as we do not have the block it counts into.
return false
}

newLFB = f.countVote(block, vote.weight)

return true
})

return newLFB
}

// countVote for block and its ancestors.
// Returns a block if its weight reached threshold, otherwise nil.
func (f *finalizer) countVote(block *tree.Node, vote voteWeight) *tree.Node {
var newLFB *tree.Node

f.tree.Climb(block, func(block *tree.Node) bool {
block.CountVote(vote)

if f.threshold(block) {
newLFB = block
return false // Do not climb anymore, as this is the new latest finalized block.
}
return true
})

return newLFB
}

// threshold returns true if a block votes have reach the threshold to become finalized.
func (finalizer) threshold(block *tree.Node) bool {
return block.CurrenVote() > PreBalanceThreshold
}

// onNewLatestFinalizedBlock is called when the finalizer find a new LFB. It handles the transition to a new LFB,
// and calls the event handler.
func (f *finalizer) onNewLatestFinalizedBlock(newLFB *tree.Node) {
f.tree.OnNewLatestFinalizedBlock(newLFB)
f.votes.newLatestFinalizedBlockSlot(newLFB.Slot())

root := newLFB.Root()
f.log.Info().Str("root", hex.EncodeToString(root[:])).Uint64("slot", uint64(newLFB.Slot())).Msg("new finalized block")

if f.newLFBHandler != nil {
f.newLFBHandler(newLFB.Root(), newLFB.Slot())
}
}
Loading