Skip to content

Commit

Permalink
[factory] simplify interface for archive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Nov 19, 2024
1 parent 65044c7 commit a52fe3d
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 182 deletions.
8 changes: 6 additions & 2 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
if height != "" {
inputHeight, err := strconv.ParseUint(height, 0, 64)
if err != nil {
return nil, uint64(0), err
return nil, 0, err
}
rp := rolldpos.FindProtocol(core.registry)
if rp != nil {
Expand All @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
}
if inputHeight < tipHeight {
// old data, wrap to history state reader
d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...)
historySR, err := core.sf.WorkingSetAtHeight(ctx, inputHeight)
if err != nil {
return nil, 0, err
}
d, h, err := p.ReadState(ctx, historySR, methodName, arguments...)
if err == nil {
key.Height = strconv.FormatUint(h, 10)
core.readCache.Put(key.Hash(), d)
Expand Down
1 change: 1 addition & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error {

func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) {
cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes())
cfg.chain.EnableArchiveMode = true
registry := protocol.NewRegistry()
factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis)
sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry))
Expand Down
17 changes: 9 additions & 8 deletions blockchain/integrity/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) {

// check history account's balance
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
AccountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(err)
AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
AccountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(err)
require.Equal(big.NewInt(100), AccountA.Balance)
require.Equal(big.NewInt(100), AccountB.Balance)
Expand Down Expand Up @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) {

// check the the original balance again
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr)
require.True(errors.Cause(err) == factory.ErrNotSupported)
_, err = sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
account, err = accountutil.AccountState(ctx, sr, contractAddr)
require.NoError(err)
balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root)
Expand Down
89 changes: 17 additions & 72 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type (
NewBlockBuilder(context.Context, actpool.ActPool, func(action.Envelope) (*action.SealedEnvelope, error)) (*block.Builder, error)
PutBlock(context.Context, *block.Block) error
DeleteTipBlock(context.Context, *block.Block) error
StateAtHeight(uint64, interface{}, ...protocol.StateOption) error
StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error)
WorkingSet(context.Context) (protocol.StateManager, error)
WorkingSetAtHeight(context.Context, uint64) (protocol.StateManager, error)
}
Expand Down Expand Up @@ -268,7 +266,16 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
if err != nil {
return nil, err
}
store, err := newFactoryWorkingSetStore(sf.protocolView, flusher)
var (
rootKey = ArchiveTrieRootKey
createTrie = true
)
if height < sf.currentChainHeight {
// archive mode
rootKey = fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height)
createTrie = false
}
store, err := newFactoryWorkingSetStore(sf.protocolView, flusher, rootKey, createTrie)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -388,16 +395,20 @@ func (sf *factory) WorkingSet(ctx context.Context) (protocol.StateManager, error
}

func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
if !sf.saveHistory {
return nil, ErrNoArchiveData
}
sf.mutex.Lock()
defer sf.mutex.Unlock()
return sf.newWorkingSet(ctx, height+1)
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.newWorkingSet(ctx, height)
}

// PutBlock persists all changes in RunActions() into the DB
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
sf.mutex.Lock()
timer := sf.timerFactory.NewTimer("Commit")
sf.mutex.Unlock()
defer timer.End()
producer := blk.PublicKey().Address()
if producer == nil {
Expand Down Expand Up @@ -456,52 +467,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory")
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return err
}
if cfg.Keys != nil {
return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if height > sf.currentChainHeight {
return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s)
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return nil, err
}
if cfg.Keys != nil {
return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return nil, err
}
return state.NewIterator(keys, values)
}

// State returns a confirmed state in the state factory
func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
Expand Down Expand Up @@ -573,26 +538,6 @@ func legacyKeyLen() int {
return 20
}

func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factory) createGenesisStates(ctx context.Context) error {
ws, err := sf.newWorkingSet(ctx, 0)
if err != nil {
Expand Down
19 changes: 11 additions & 8 deletions state/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,21 +567,24 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) {

// check archive data
if statetx {
// statetx not support archive mode
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
// statetx not support archive mode yet
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(t, ErrNotSupported, errors.Cause(err))
} else {
_, err = sf.WorkingSetAtHeight(ctx, 10)
if !archive {
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
} else {
require.Contains(t, err.Error(), "query height 10 is higher than tip height 1")
}
sr, err := sf.WorkingSetAtHeight(ctx, 0)
if !archive {
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
} else {
accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
accountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(t, err)
require.Equal(t, big.NewInt(100), accountA.Balance)
require.Equal(t, big.NewInt(0), accountB.Balance)
Expand Down
49 changes: 0 additions & 49 deletions state/factory/historyfactory.go

This file was deleted.

3 changes: 2 additions & 1 deletion state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
}

func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
return sdb.newWorkingSet(ctx, height+1)
// TODO: implement archive mode
return nil, ErrNotSupported
}

// PutBlock persists all changes in RunActions() into the DB
Expand Down
5 changes: 4 additions & 1 deletion state/factory/workingset.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (ws *workingSet) Commit(ctx context.Context) error {
return err
}
ws.Reset()
return nil
return ws.store.Stop(ctx)
}

// State pulls a state from DB
Expand All @@ -343,6 +343,9 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64
if err != nil {
return ws.height, err
}
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
value, err := ws.store.Get(cfg.Namespace, cfg.Key)
if err != nil {
return ws.height, err
Expand Down
4 changes: 2 additions & 2 deletions state/factory/workingsetstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, re
}
}

func newFactoryWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher) (workingSetStore, error) {
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, flusher.KVStoreWithBuffer(), ArchiveTrieRootKey, true)
func newFactoryWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, trieRootKey string, createTrie bool) (workingSetStore, error) {
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, flusher.KVStoreWithBuffer(), trieRootKey, createTrie)
if err != nil {
return nil, err
}
Expand Down
39 changes: 0 additions & 39 deletions test/mock/mock_factory/mock_factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a52fe3d

Please sign in to comment.