Skip to content

Commit

Permalink
if statediffing is on, lock tries in triedb until the statediffing se…
Browse files Browse the repository at this point in the history
…rvice signals they are done using them
  • Loading branch information
i-norden committed Aug 14, 2020
1 parent e004e9b commit c94e07e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 14 deletions.
3 changes: 3 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
}
utils.SetShhConfig(ctx, stack, &cfg.Shh)
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
cfg.Eth.Diffing = true
}

return stack, cfg
}
Expand Down
52 changes: 48 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)

var (
Expand Down Expand Up @@ -115,6 +115,7 @@ type CacheConfig struct {
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
StateDiffing bool // Whether or not the statediffing service is running
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -177,6 +178,10 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.

// Locked roots and their mutex
trieLock sync.Mutex
lockedRoots map[common.Hash]bool
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -214,6 +219,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
lockedRoots: make(map[common.Hash]bool),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -857,7 +863,10 @@ func (bc *BlockChain) Stop() {
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
pruneRoot := bc.triegc.PopItem().(common.Hash)
if !bc.TrieLocked(pruneRoot) {
triedb.Dereference(pruneRoot)
}
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
Expand Down Expand Up @@ -1342,6 +1351,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

// If we are statediffing, lock the trie until the statediffing service is done using it
if bc.cacheConfig.StateDiffing {
bc.LockTrie(root)
}

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
Expand Down Expand Up @@ -1380,8 +1394,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}
log.Debug("Dereferencing", "root", root.(common.Hash).Hex())
triedb.Dereference(root.(common.Hash))
pruneRoot := root.(common.Hash)
if !bc.TrieLocked(pruneRoot) {
log.Debug("Dereferencing", "root", root.(common.Hash).Hex())
triedb.Dereference(pruneRoot)
}
}
}
}
Expand Down Expand Up @@ -2254,3 +2271,30 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}

// TrieLocked returns whether the trie associated with the provided root is locked for use
func (bc *BlockChain) TrieLocked(root common.Hash) bool {
bc.trieLock.Lock()
locked, ok := bc.lockedRoots[root]
bc.trieLock.Unlock()
if !ok {
return false
}
return locked
}

// LockTrie prevents dereferencing of the provided root
func (bc *BlockChain) LockTrie(root common.Hash) {
bc.trieLock.Lock()
bc.lockedRoots[root] = true
bc.trieLock.Unlock()
return
}

// UnlockTrie allows dereferencing of the provided root- provided it was previously locked
func (bc *BlockChain) UnlockTrie(root common.Hash) {
bc.trieLock.Lock()
bc.lockedRoots[root] = false
bc.trieLock.Unlock()
return
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
StateDiffing: config.Diffing,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
Expand Down
4 changes: 4 additions & 0 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,8 @@ type Config struct {

// MuirGlacier block override (TODO: remove after the fork)
OverrideMuirGlacier *big.Int `toml:",omitempty"`

// Signify whether or not we are producing statediffs
// If we are, do not dereference state roots until the statediffing service is done with them
Diffing bool
}
14 changes: 7 additions & 7 deletions statediff/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, error) {
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return stateNodes, nil
return stateNodes, it.Error()
}

// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
Expand Down Expand Up @@ -301,7 +301,7 @@ func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddres
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return diffAcountsAtB, diffPathsAtB, nil
return diffAcountsAtB, diffPathsAtB, it.Error()
}

// createdAndUpdatedStateWithIntermediateNodes returns
Expand Down Expand Up @@ -368,7 +368,7 @@ func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIt
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, nil
return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, it.Error()
}

// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
Expand Down Expand Up @@ -433,7 +433,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
return nil, nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return emptiedPaths, diffAccountAtA, nil
return emptiedPaths, diffAccountAtA, it.Error()
}

// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
Expand Down Expand Up @@ -559,7 +559,7 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return storageDiffs, nil
return storageDiffs, it.Error()
}

// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
Expand Down Expand Up @@ -641,7 +641,7 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
}
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return createdOrUpdatedStorage, diffPathsAtB, nil
return createdOrUpdatedStorage, diffPathsAtB, it.Error()
}

func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedKeys []common.Hash, intermediateNodes bool) ([]StorageNode, error) {
Expand Down Expand Up @@ -700,7 +700,7 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return deletedStorage, nil
return deletedStorage, it.Error()
}

// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
Expand Down
8 changes: 5 additions & 3 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type blockChain interface {
GetBlockByNumber(number uint64) *types.Block
GetReceiptsByHash(hash common.Hash) types.Receipts
GetTdByHash(hash common.Hash) *big.Int
UnlockTrie(root common.Hash)
}

// IService is the state-diffing service interface
Expand All @@ -53,7 +54,7 @@ type IService interface {
// Main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
// Method to get state diff object at specific block
Expand Down Expand Up @@ -165,8 +166,7 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
// create payload for this subscription type
payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
if err != nil {
log.Error(fmt.Sprintf("statediff processing error for subscriptions with parameters: %+v", params))
sds.closeType(ty)
log.Error(fmt.Sprintf("statediff processing error a blockheight %d for subscriptions with parameters: %+v err: %s", currentBlock.Number().Uint64(), params, err.Error()))
continue
}
for id, sub := range subs {
Expand Down Expand Up @@ -201,6 +201,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(),
}, params)
// allow dereferencing of parent, keep current locked as it should be the next parent
sds.BlockChain.UnlockTrie(parentRoot)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c94e07e

Please sign in to comment.