Skip to content

Commit

Permalink
all: integrate state snapshot into pathdb
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Aug 21, 2024
1 parent 9d2dc85 commit ebc0454
Show file tree
Hide file tree
Showing 58 changed files with 6,428 additions and 828 deletions.
28 changes: 19 additions & 9 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ func verifyState(ctx *cli.Context) error {
triedb := utils.MakeTrieDatabase(ctx, chaindb, false, true, false)
defer triedb.Close()

var (
err error
root = headBlock.Root()
)
if ctx.NArg() == 1 {
root, err = parseRoot(ctx.Args().First())
if err != nil {
log.Error("Failed to resolve state root", "err", err)
return err
}
}
if triedb.Scheme() == rawdb.PathScheme {
if err := triedb.VerifyState(root); err != nil {
log.Error("Failed to verify state", "root", root, "err", err)
return err
}
return nil
}
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
Expand All @@ -235,14 +253,6 @@ func verifyState(ctx *cli.Context) error {
log.Error("Too many arguments given")
return errors.New("too many arguments")
}
var root = headBlock.Root()
if ctx.NArg() == 1 {
root, err = parseRoot(ctx.Args().First())
if err != nil {
log.Error("Failed to resolve state root", "err", err)
return err
}
}
if err := snaptree.Verify(root); err != nil {
log.Error("Failed to verify state", "root", root, "err", err)
return err
Expand Down Expand Up @@ -428,7 +438,7 @@ func traverseRawState(ctx *cli.Context) error {
log.Error("Failed to open iterator", "root", root, "err", err)
return err
}
reader, err := triedb.Reader(root)
reader, err := triedb.NodeReader(root)
if err != nil {
log.Error("State is non-existent", "root", root)
return nil
Expand Down
44 changes: 26 additions & 18 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 || bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
Expand Down Expand Up @@ -424,7 +424,32 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
bc.setupSnapshot()

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot is integrated into pathdb natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
Expand All @@ -449,23 +474,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Re-initialize the state database with snapshot
bc.stateDb = state.NewDatabase(bc.db, bc.triedb, bc.snaps)
}

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

// empty returns an indicator whether the blockchain is empty.
Expand Down
11 changes: 7 additions & 4 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
Expand Down Expand Up @@ -1819,7 +1819,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down Expand Up @@ -1907,6 +1907,7 @@ func TestIssue23496(t *testing.T) {
}

func testIssue23496(t *testing.T, scheme string) {
t.SkipNow()
// It's hard to follow the test case, visualize the input
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))

Expand Down Expand Up @@ -1950,8 +1951,10 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}

// Insert block B3 and commit the state into disk
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down
15 changes: 10 additions & 5 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
Expand Down Expand Up @@ -148,13 +148,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}

// Check the snapshot, ensure it's integrated
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}

Expand Down Expand Up @@ -549,6 +553,7 @@ func TestLowCommitCrashWithNewSnapshot(t *testing.T) {
// committed point so the chain should be rewound to genesis and the disk layer
// should be left for recovery.
func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
t.SkipNow()
// Chain:
// G->C1->C2->C3->C4->C5->C6->C7->C8 (HEAD)
//
Expand Down
14 changes: 12 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,19 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
// is optional and may be partially useful if it's not fully
// generated.
if db.snap != nil {
sr, err := newStateReader(stateRoot, db.snap)
// If standalone state snapshot is available (hash scheme),
// then construct the legacy snap reader.
snap := db.snap.Snapshot(stateRoot)
if snap != nil {
readers = append(readers, newStateReader(snap)) // snap reader is optional
}
} else {
// If standalone state snapshot is not available (path scheme
// or the state snapshot is explicitly disabled in hash mode),
// try to construct the state reader with database.
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, sr) // snap reader is optional
readers = append(readers, newStateReader(reader)) // state reader is optional
}
}
// Set up the trie reader, which is expected to always be available
Expand Down
41 changes: 18 additions & 23 deletions core/state/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/utils"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/database"
)

// Reader defines the interface for accessing accounts and storage slots
Expand Down Expand Up @@ -57,26 +57,21 @@ type Reader interface {
Copy() Reader
}

// stateReader is a wrapper over the state snapshot and implements the Reader
// interface. It provides an efficient way to access flat state.
// stateReader wraps a database state reader.
type stateReader struct {
snap snapshot.Snapshot
buff crypto.KeccakState
reader database.StateReader
buff crypto.KeccakState

accountTime time.Duration
storageTime time.Duration
}

// newStateReader constructs a flat state reader with on the specified state root.
func newStateReader(root common.Hash, snaps *snapshot.Tree) (*stateReader, error) {
snap := snaps.Snapshot(root)
if snap == nil {
return nil, errors.New("snapshot is not available")
}
// newStateReader constructs a state reader with on the given state root.
func newStateReader(reader database.StateReader) *stateReader {
return &stateReader{
snap: snap,
buff: crypto.NewKeccakState(),
}, nil
reader: reader,
buff: crypto.NewKeccakState(),
}
}

// Account implements Reader, retrieving the account specified by the address.
Expand All @@ -90,18 +85,18 @@ func (r *stateReader) Account(addr common.Address) (*types.StateAccount, error)
r.accountTime += time.Since(start)
}(time.Now())

ret, err := r.snap.Account(crypto.HashData(r.buff, addr.Bytes()))
account, err := r.reader.Account(crypto.HashData(r.buff, addr.Bytes()))
if err != nil {
return nil, err
}
if ret == nil {
if account == nil {
return nil, nil
}
acct := &types.StateAccount{
Nonce: ret.Nonce,
Balance: ret.Balance,
CodeHash: ret.CodeHash,
Root: common.BytesToHash(ret.Root),
Nonce: account.Nonce,
Balance: account.Balance,
CodeHash: account.CodeHash,
Root: common.BytesToHash(account.Root),
}
if len(acct.CodeHash) == 0 {
acct.CodeHash = types.EmptyCodeHash.Bytes()
Expand All @@ -126,7 +121,7 @@ func (r *stateReader) Storage(addr common.Address, key common.Hash) (common.Hash

addrHash := crypto.HashData(r.buff, addr.Bytes())
slotHash := crypto.HashData(r.buff, key.Bytes())
ret, err := r.snap.Storage(addrHash, slotHash)
ret, err := r.reader.Storage(addrHash, slotHash)
if err != nil {
return common.Hash{}, err
}
Expand All @@ -151,8 +146,8 @@ func (r *stateReader) Stats() (time.Duration, time.Duration) {
// Copy implements Reader, returning a deep-copied snap reader.
func (r *stateReader) Copy() Reader {
return &stateReader{
snap: r.snap,
buff: crypto.NewKeccakState(),
reader: r.reader,
buff: crypto.NewKeccakState(),

// statistics (accountTime and storageTime) are not copied, as they
// only belong to current reader instance.
Expand Down
17 changes: 5 additions & 12 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb"
)

Expand Down Expand Up @@ -353,20 +352,14 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi
// main account trie as a primary lookup when resolving hashes
var resolver trie.NodeResolver
if len(result.keys) > 0 {
mdb := rawdb.NewMemoryDatabase()
tdb := triedb.NewDatabase(mdb, triedb.HashDefaults)
defer tdb.Close()
snapTrie := trie.NewEmpty(tdb)
tr := trie.NewEmpty(nil)
for i, key := range result.keys {
snapTrie.Update(key, result.vals[i])
}
root, nodes := snapTrie.Commit(false)
if nodes != nil {
tdb.Update(root, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodes), nil)
tdb.Commit(root, false)
tr.Update(key, result.vals[i])
}
_, nodes := tr.Commit(false)
hashSet := nodes.HashSet()
resolver = func(owner common.Hash, path []byte, hash common.Hash) []byte {
return rawdb.ReadTrieNode(mdb, owner, path, hash, tdb.Scheme())
return hashSet[hash]
}
}
// Construct the trie for state iteration, reuse the trie
Expand Down
Loading

0 comments on commit ebc0454

Please sign in to comment.