Skip to content

Commit

Permalink
all: integrate state snapshot into pathdb
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Oct 15, 2024
1 parent add5709 commit 8c6bf3b
Show file tree
Hide file tree
Showing 59 changed files with 6,762 additions and 907 deletions.
29 changes: 20 additions & 9 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,25 @@ func verifyState(ctx *cli.Context) error {
triedb := utils.MakeTrieDatabase(ctx, chaindb, false, true, false)
defer triedb.Close()

var (
err error
root = headBlock.Root()
)
if ctx.NArg() == 1 {
root, err = parseRoot(ctx.Args().First())
if err != nil {
log.Error("Failed to resolve state root", "err", err)
return err
}
}
if triedb.Scheme() == rawdb.PathScheme {
if err := triedb.VerifyState(root); err != nil {
log.Error("Failed to verify state", "root", root, "err", err)
return err
}
log.Info("Verified the state", "root", root)
return snapshot.CheckDanglingStorage(chaindb)
}
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
Expand All @@ -235,14 +254,6 @@ func verifyState(ctx *cli.Context) error {
log.Error("Too many arguments given")
return errors.New("too many arguments")
}
var root = headBlock.Root()
if ctx.NArg() == 1 {
root, err = parseRoot(ctx.Args().First())
if err != nil {
log.Error("Failed to resolve state root", "err", err)
return err
}
}
if err := snaptree.Verify(root); err != nil {
log.Error("Failed to verify state", "root", root, "err", err)
return err
Expand Down Expand Up @@ -428,7 +439,7 @@ func traverseRawState(ctx *cli.Context) error {
log.Error("Failed to open iterator", "root", root, "err", err)
return err
}
reader, err := triedb.Reader(root)
reader, err := triedb.NodeReader(root)
if err != nil {
log.Error("State is non-existent", "root", root)
return nil
Expand Down
61 changes: 36 additions & 25 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
StateHistory: c.StateHistory,
TrieCleanSize: c.TrieCleanLimit * 1024 * 1024,
StateCleanSize: c.SnapshotLimit * 1024 * 1024,
WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
return config
Expand Down Expand Up @@ -349,11 +350,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
// Head state is missing, before the state recovery, find out the disk
// layer point of snapshot(if it's enabled). Make sure the rewound point
// is lower than disk layer.
//
// Note it's unnecessary in path mode which always keep trie data and
// state data in consistent.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
Expand Down Expand Up @@ -426,15 +430,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
bc.setupSnapshot()

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
Expand All @@ -451,23 +479,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

// empty returns an indicator whether the blockchain is empty.
Expand Down
32 changes: 21 additions & 11 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
Expand Down Expand Up @@ -1819,7 +1819,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down Expand Up @@ -1950,8 +1950,10 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}

// Insert block B3 and commit the state into disk
Expand Down Expand Up @@ -1995,15 +1997,21 @@ func testIssue23496(t *testing.T, scheme string) {
}
expHead := uint64(1)
if scheme == rawdb.PathScheme {
expHead = uint64(2)
expHead = uint64(3)
}
if head := chain.CurrentBlock(); head.Number.Uint64() != expHead {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead)
}

// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
if scheme == rawdb.PathScheme {
// Reinsert B3-B4
if _, err := chain.InsertChain(blocks[2:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
} else {
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
}
if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4)
Expand All @@ -2014,7 +2022,9 @@ func testIssue23496(t *testing.T, scheme string) {
if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4))
}
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
if scheme == rawdb.HashScheme {
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
}
}
}
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down
16 changes: 10 additions & 6 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
Expand Down Expand Up @@ -148,13 +148,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}

// Check the snapshot, ensure it's integrated
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}

Expand Down Expand Up @@ -569,7 +573,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
if scheme == rawdb.PathScheme {
expHead = uint64(4)
expHead = uint64(6)
}
test := &crashSnapshotTest{
snapshotTestBasic{
Expand Down
14 changes: 12 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,19 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
// is optional and may be partially useful if it's not fully
// generated.
if db.snap != nil {
sr, err := newStateReader(stateRoot, db.snap)
// If standalone state snapshot is available (hash scheme),
// then construct the legacy snap reader.
snap := db.snap.Snapshot(stateRoot)
if snap != nil {
readers = append(readers, newStateReader(snap)) // snap reader is optional
}
} else {
// If standalone state snapshot is not available (path scheme
// or the state snapshot is explicitly disabled in hash mode),
// try to construct the state reader with database.
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, sr) // snap reader is optional
readers = append(readers, newStateReader(reader)) // state reader is optional
}
}
// Set up the trie reader, which is expected to always be available
Expand Down
41 changes: 18 additions & 23 deletions core/state/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"maps"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/utils"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/database"
)

// Reader defines the interface for accessing accounts and storage slots
Expand All @@ -52,23 +52,18 @@ type Reader interface {
Copy() Reader
}

// stateReader is a wrapper over the state snapshot and implements the Reader
// interface. It provides an efficient way to access flat state.
// stateReader wraps a database state reader.
type stateReader struct {
snap snapshot.Snapshot
buff crypto.KeccakState
reader database.StateReader
buff crypto.KeccakState
}

// newStateReader constructs a flat state reader with on the specified state root.
func newStateReader(root common.Hash, snaps *snapshot.Tree) (*stateReader, error) {
snap := snaps.Snapshot(root)
if snap == nil {
return nil, errors.New("snapshot is not available")
}
// newStateReader constructs a state reader with on the given state root.
func newStateReader(reader database.StateReader) *stateReader {
return &stateReader{
snap: snap,
buff: crypto.NewKeccakState(),
}, nil
reader: reader,
buff: crypto.NewKeccakState(),
}
}

// Account implements Reader, retrieving the account specified by the address.
Expand All @@ -78,18 +73,18 @@ func newStateReader(root common.Hash, snaps *snapshot.Tree) (*stateReader, error
//
// The returned account might be nil if it's not existent.
func (r *stateReader) Account(addr common.Address) (*types.StateAccount, error) {
ret, err := r.snap.Account(crypto.HashData(r.buff, addr.Bytes()))
account, err := r.reader.Account(crypto.HashData(r.buff, addr.Bytes()))
if err != nil {
return nil, err
}
if ret == nil {
if account == nil {
return nil, nil
}
acct := &types.StateAccount{
Nonce: ret.Nonce,
Balance: ret.Balance,
CodeHash: ret.CodeHash,
Root: common.BytesToHash(ret.Root),
Nonce: account.Nonce,
Balance: account.Balance,
CodeHash: account.CodeHash,
Root: common.BytesToHash(account.Root),
}
if len(acct.CodeHash) == 0 {
acct.CodeHash = types.EmptyCodeHash.Bytes()
Expand All @@ -110,7 +105,7 @@ func (r *stateReader) Account(addr common.Address) (*types.StateAccount, error)
func (r *stateReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
addrHash := crypto.HashData(r.buff, addr.Bytes())
slotHash := crypto.HashData(r.buff, key.Bytes())
ret, err := r.snap.Storage(addrHash, slotHash)
ret, err := r.reader.Storage(addrHash, slotHash)
if err != nil {
return common.Hash{}, err
}
Expand All @@ -131,8 +126,8 @@ func (r *stateReader) Storage(addr common.Address, key common.Hash) (common.Hash
// Copy implements Reader, returning a deep-copied snap reader.
func (r *stateReader) Copy() Reader {
return &stateReader{
snap: r.snap,
buff: crypto.NewKeccakState(),
reader: r.reader,
buff: crypto.NewKeccakState(),
}
}

Expand Down
Loading

0 comments on commit 8c6bf3b

Please sign in to comment.