Skip to content

Commit

Permalink
sync: make history segment more flexible in snap sync;
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbundler committed Dec 7, 2023
1 parent 849ec95 commit ec25525
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 92 deletions.
20 changes: 15 additions & 5 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,15 @@ func exportSegment(ctx *cli.Context) error {
defer db.Close()

genesisHash := rawdb.ReadCanonicalHash(db, 0)
td := rawdb.ReadTd(db, genesisHash, 0)
chainConfig, engine, headerChain, err := simpleHeaderChain(db, genesisHash)
if err != nil {
return err
}
latest := headerChain.CurrentHeader()
if _, ok := engine.(consensus.PoSA); !ok {
return errors.New("current chain is not POSA consensus, cannot generate history segment")
}
if !chainConfig.IsLuban(latest.Number) {
return errors.New("current chain is not enable Luban hard fork, cannot generate history segment")
}
Expand Down Expand Up @@ -763,10 +767,15 @@ func exportSegment(ctx *cli.Context) error {
Index: 0,
ReGenesisNumber: 0,
ReGenesisHash: genesisHash,
TD: td.Uint64(),
},
}
// try find finalized block in every segment boundary
for num := boundStartBlock; num <= target; num += historySegmentLength {
// align the segment start at parlia's epoch
if chainConfig.Parlia != nil {
num -= num % chainConfig.Parlia.Epoch
}
var fs, ft *types.Header
for next := num + 1; next <= target; next++ {
fs = headerChain.GetHeaderByNumber(next)
Expand Down Expand Up @@ -799,11 +808,11 @@ func exportSegment(ctx *cli.Context) error {
if err != nil {
return err
}
segment.ConsensusData = enc
segment.ConsensusData = hexutil.Encode(enc)
}
segments = append(segments, segment)
}
if err = params.ValidateHisSegments(genesisHash, segments); err != nil {
if err = params.ValidateHisSegments(params.NewHistoryBlock(0, genesisHash, td.Uint64()), segments); err != nil {
return err
}
output, err := json.MarshalIndent(segments, "", " ")
Expand All @@ -813,7 +822,7 @@ func exportSegment(ctx *cli.Context) error {
log.Info("Generate History Segment done", "count", len(segments), "elapsed", common.PrettyDuration(time.Since(start)))

out := ctx.String(utils.HistorySegOutputFlag.Name)
outFile, err := os.OpenFile(out, os.O_CREATE|os.O_RDWR, 0644)
outFile, err := os.OpenFile(out, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return err
}
Expand All @@ -834,13 +843,14 @@ func pruneHistorySegments(ctx *cli.Context) error {
defer db.Close()

genesisHash := rawdb.ReadCanonicalHash(db, 0)
td := rawdb.ReadTd(db, genesisHash, 0)
_, _, headerChain, err := simpleHeaderChain(db, genesisHash)
if err != nil {
return err
}
cfg := &params.HistorySegmentConfig{
CustomPath: "",
Genesis: genesisHash,
Genesis: params.NewHistoryBlock(0, genesisHash, td.Uint64()),
}
if ctx.IsSet(utils.HistorySegCustomFlag.Name) {
cfg.CustomPath = ctx.String(utils.HistorySegCustomFlag.Name)
Expand All @@ -864,7 +874,7 @@ func pruneHistorySegments(ctx *cli.Context) error {
}

pruneTail := lastSegment.ReGenesisNumber
log.Info("The older history will be pruned", "lastSegment", &lastSegment, "curSegment", &curSegment, "pruneTail", pruneTail)
log.Info("The older history will be pruned", "lastSegment", lastSegment, "curSegment", curSegment, "pruneTail", pruneTail)
if err = rawdb.PruneTxLookupToTail(db, pruneTail); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,5 @@ type PoSA interface {
VerifyVote(chain ChainHeaderReader, vote *types.VoteEnvelope) error
IsActiveValidatorAt(chain ChainHeaderReader, header *types.Header, checkVoteKeyFn func(bLSPublicKey *types.BLSPublicKey) bool) bool
GetConsensusData(chain ChainHeaderReader, header *types.Header) ([]byte, error)
SetupLastSegment(segment *params.HistorySegment)
SetupHistorySegment(hsm *params.HistorySegmentManager)
}
59 changes: 44 additions & 15 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"math/big"
"math/rand"
"runtime/debug"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -233,7 +234,7 @@ type Parlia struct {
fakeDiff bool // Skip difficulty verifications

// history segment, it provides history segment's consensus data to prevent generate snap from older headers
lastSegment *params.HistorySegment
hsm *params.HistorySegmentManager
}

// New creates a Parlia consensus engine.
Expand Down Expand Up @@ -289,8 +290,8 @@ func New(
return c
}

func (p *Parlia) SetupLastSegment(segment *params.HistorySegment) {
p.lastSegment = segment
func (p *Parlia) SetupHistorySegment(hsm *params.HistorySegmentManager) {
p.hsm = hsm
}

func (p *Parlia) IsSystemTransaction(tx *types.Transaction, header *types.Header) (bool, error) {
Expand Down Expand Up @@ -416,6 +417,7 @@ func (p *Parlia) getParent(chain consensus.ChainHeaderReader, header *types.Head
}

if parent == nil || parent.Number.Uint64() != number-1 || parent.Hash() != header.ParentHash {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
return parent, nil
Expand Down Expand Up @@ -538,6 +540,7 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H
// Ensure that the extra-data contains a signer list on checkpoint, but none otherwise
signersBytes := getValidatorBytesFromHeader(header, p.chainConfig, p.config)
if !isEpoch && len(signersBytes) != 0 {
log.Error("validate header err", "number", header.Number, "hash", header.Hash(), "chainconfig", p.chainConfig, "config", p.config, "bytes", len(signersBytes))
return errExtraValidators
}
if isEpoch && len(signersBytes) == 0 {
Expand Down Expand Up @@ -685,17 +688,9 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
}
// check history consensus data, load snapshot
if p.lastSegment != nil && p.lastSegment.MatchBlock(hash, number) {
var tmp Snapshot
err := json.Unmarshal(p.lastSegment.ConsensusData, &tmp)
if err == nil {
tmp.config = p.config
tmp.sigCache = p.signatures
tmp.ethAPI = p.ethAPI
snap = &tmp
break
}
log.Error("Try load snapshot from history segment, wrong encode", "number", number, "hash", hash, "err", err)
if s, ok := p.findSnapFromHistorySegment(hash, number); ok {
snap = s
break
}

// If we're at the genesis, snapshot the initial state. Alternatively if we have
Expand Down Expand Up @@ -731,13 +726,15 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
// If we have explicit parents, pick from there (enforced)
header = parents[len(parents)-1]
if header.Hash() != hash || header.Number.Uint64() != number {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
parents = parents[:len(parents)-1]
} else {
// No explicit parents (or no more left), reach out to the database
header = chain.GetHeader(hash, number)
if header == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
}
Expand Down Expand Up @@ -771,6 +768,36 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
return snap, err
}

func (p *Parlia) findSnapFromHistorySegment(hash common.Hash, number uint64) (*Snapshot, bool) {
if p.hsm == nil {
return nil, false
}
segment := p.hsm.CurSegment(number + 1)
if segment.ReGenesisNumber != number+1 {
return nil, false
}
var tmp Snapshot
enc, err := hexutil.Decode(segment.ConsensusData)
if err != nil {
log.Warn("Try load snapshot from history segment, wrong hex", "number", number, "hash", hash, "err", err)
return nil, false
}
err = json.Unmarshal(enc, &tmp)
if err != nil {
log.Warn("Try load snapshot from history segment, wrong encode", "number", number, "hash", hash, "err", err)
return nil, false
}
if tmp.Number != number || tmp.Hash != hash {
return nil, false
}

tmp.config = p.config
tmp.sigCache = p.signatures
tmp.ethAPI = p.ethAPI
log.Info("found history segment snapshot", "number", number, "hash", hash, "segment", segment)
return &tmp, true
}

// VerifyUncles implements consensus.Engine, always returning an error for any
// uncles as this consensus mechanism doesn't permit uncles.
func (p *Parlia) VerifyUncles(chain consensus.ChainReader, block *types.Block) error {
Expand Down Expand Up @@ -972,6 +999,7 @@ func (p *Parlia) Prepare(chain consensus.ChainHeaderReader, header *types.Header
// Ensure the timestamp has the correct delay
parent := chain.GetHeader(header.ParentHash, number-1)
if parent == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return consensus.ErrUnknownAncestor
}
header.Time = p.blockTimeForRamanujanFork(snap, header, parent)
Expand Down Expand Up @@ -1532,9 +1560,10 @@ func (p *Parlia) Close() error {
return nil
}

// GetConsensusData load the header's last snapshot for history segment
func (p *Parlia) GetConsensusData(chain consensus.ChainHeaderReader, header *types.Header) ([]byte, error) {
number := header.Number.Uint64()
snap, err := p.snapshot(chain, number, header.Hash(), nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime/debug"
"sort"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -271,6 +272,7 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea
if number > 0 && number%s.config.Epoch == uint64(len(snap.Validators)/2) {
checkpointHeader := FindAncientHeader(header, uint64(len(snap.Validators)/2), chain, parents)
if checkpointHeader == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}

Expand Down
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ type BlockChain struct {
pipeCommit bool

// history segment
lastSegment *params.HistorySegment
hsm *params.HistorySegmentManager

// monitor
doubleSignMonitor *monitor.DoubleSignMonitor
Expand Down Expand Up @@ -555,8 +555,8 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

func (bc *BlockChain) SetupHistorySegment(lastSegment *params.HistorySegment) {
bc.lastSegment = lastSegment
func (bc *BlockChain) SetupHistorySegment(hsm *params.HistorySegmentManager) {
bc.hsm = hsm
}

func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts, block *types.Block) {
Expand Down
11 changes: 9 additions & 2 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,15 @@ func (bc *BlockChain) SubscribeFinalizedHeaderEvent(ch chan<- FinalizedHeaderEve
return bc.scope.Track(bc.finalizedHeaderFeed.Subscribe(ch))
}

func (bc *BlockChain) LastHistorySegment() *params.HistorySegment {
return bc.lastSegment
func (bc *BlockChain) LastHistorySegment(num uint64) *params.HistorySegment {
if bc.hsm == nil {
return nil
}
segment, ok := bc.hsm.LastSegmentByNumber(num)
if !ok {
return nil
}
return segment
}

func (bc *BlockChain) WriteCanonicalHeaders(headers []*types.Header, tds []uint64) error {
Expand Down
3 changes: 2 additions & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, triedb *trie.Database, gen
log.Info("Writing default main-net genesis block")
genesis = DefaultGenesisBlock()
} else {
log.Info("Writing custom genesis block")
log.Info("Writing custom genesis block", "config", genesis.Config)
}
block, err := genesis.Commit(db, triedb)
if err != nil {
Expand Down Expand Up @@ -408,6 +408,7 @@ func LoadChainConfig(db ethdb.Database, genesis *Genesis) (*params.ChainConfig,
if stored != (common.Hash{}) {
storedcfg := rawdb.ReadChainConfig(db, stored)
if storedcfg != nil {
log.Info("found chain config", "hash", stored, "cfg", storedcfg)
return storedcfg, stored, nil
}
}
Expand Down
33 changes: 32 additions & 1 deletion core/rawdb/history_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rawdb

import (
"bytes"
"errors"
"fmt"
"math/big"
"time"
Expand All @@ -13,6 +14,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

var (
rangeCompactionThreshold = 100000
)

// PruneTxLookupToTail it will iterator tx look up and delete to tail
func PruneTxLookupToTail(db ethdb.KeyValueStore, tail uint64) error {
indexTail := ReadTxIndexTail(db)
Expand All @@ -24,6 +29,7 @@ func PruneTxLookupToTail(db ethdb.KeyValueStore, tail uint64) error {
start = time.Now()
logged = time.Now()
txlookups stat
count = 0

batch = db.NewBatch()
iter = db.NewIterator(txLookupPrefix, nil)
Expand Down Expand Up @@ -53,17 +59,42 @@ func PruneTxLookupToTail(db ethdb.KeyValueStore, tail uint64) error {
log.Info("PruneTxLookupToTail", "count", txlookups.Count(), "size", txlookups.Size(), "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
count++
}
WriteTxIndexTail(batch, tail)
if err := batch.Write(); err != nil {
return err
}
log.Info("PruneTxLookupToTail finish", "count", txlookups.Count(), "size", txlookups.Size(), "elapsed", common.PrettyDuration(time.Since(start)))

// Start compactions, will remove the deleted data from the disk immediately.
// Note for small pruning, the compaction is skipped.
if count >= rangeCompactionThreshold {
cstart := time.Now()
for b := 0x00; b <= 0xf0; b += 0x10 {
var (
start = []byte{byte(b)}
end = []byte{byte(b + 0x10)}
)
if b == 0xf0 {
end = nil
}
log.Info("Compacting database", "range", fmt.Sprintf("%#x-%#x", start, end), "elapsed", common.PrettyDuration(time.Since(cstart)))
if err := db.Compact(start, end); err != nil {
log.Error("Database compaction failed", "error", err)
return err
}
}
log.Info("Database compaction finished", "elapsed", common.PrettyDuration(time.Since(cstart)))
}
return nil
}

func AvailableHistorySegment(db ethdb.Reader, segments ...params.HistorySegment) error {
func AvailableHistorySegment(db ethdb.Reader, segments ...*params.HistorySegment) error {
for _, s := range segments {
if s == nil {
return errors.New("found nil segment")
}
hash := ReadCanonicalHash(db, s.ReGenesisNumber)
if hash != s.ReGenesisHash {
return fmt.Errorf("cannot find segment StartAtBlock, seg: %v", s)
Expand Down
4 changes: 2 additions & 2 deletions core/vote/vote_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func (m *mockInvalidPOSA) GetConsensusData(chain consensus.ChainHeaderReader, he
return nil, nil
}

func (m *mockPOSA) SetupLastSegment(segment *params.HistorySegment) {
func (m *mockPOSA) SetupHistorySegment(hsm *params.HistorySegmentManager) {
}

func (m *mockInvalidPOSA) SetupLastSegment(segment *params.HistorySegment) {
func (m *mockInvalidPOSA) SetupHistorySegment(hsm *params.HistorySegmentManager) {
}

func (pool *VotePool) verifyStructureSizeOfVotePool(receivedVotes, curVotes, futureVotes, curVotesPq, futureVotesPq int) bool {
Expand Down
Loading

0 comments on commit ec25525

Please sign in to comment.