From be618b8da77569d42d59bcb2d10a77e36643d446 Mon Sep 17 00:00:00 2001 From: dustinxie Date: Sun, 24 Nov 2024 17:37:49 -0800 Subject: [PATCH] [factory] simplify interface for archive mode (#4474) --- api/coreservice.go | 8 +- api/serverV2_integrity_test.go | 1 + blockchain/integrity/integrity_test.go | 17 ++-- state/factory/factory.go | 104 ++++++++----------------- state/factory/factory_test.go | 37 +++++---- state/factory/historyfactory.go | 49 ------------ state/factory/statedb.go | 3 +- state/factory/workingset.go | 3 + state/factory/workingset_test.go | 4 +- state/factory/workingsetstore.go | 15 ++++ test/mock/mock_factory/mock_factory.go | 39 ---------- 11 files changed, 94 insertions(+), 186 deletions(-) delete mode 100644 state/factory/historyfactory.go diff --git a/api/coreservice.go b/api/coreservice.go index 1625d55519..87341ca957 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei if height != "" { inputHeight, err := strconv.ParseUint(height, 0, 64) if err != nil { - return nil, uint64(0), err + return nil, 0, err } rp := rolldpos.FindProtocol(core.registry) if rp != nil { @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei } if inputHeight < tipHeight { // old data, wrap to history state reader - d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...) + historySR, err := core.sf.WorkingSetAtHeight(ctx, inputHeight) + if err != nil { + return nil, 0, err + } + d, h, err := p.ReadState(ctx, historySR, methodName, arguments...) if err == nil { key.Height = strconv.FormatUint(h, 10) core.readCache.Put(key.Hash(), d) diff --git a/api/serverV2_integrity_test.go b/api/serverV2_integrity_test.go index 7727eeb8ea..0d05ed952e 100644 --- a/api/serverV2_integrity_test.go +++ b/api/serverV2_integrity_test.go @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error { func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) { cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes()) + cfg.chain.EnableArchiveMode = true registry := protocol.NewRegistry() factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis) sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry)) diff --git a/blockchain/integrity/integrity_test.go b/blockchain/integrity/integrity_test.go index 6c5c0205c5..dd11ce6834 100644 --- a/blockchain/integrity/integrity_test.go +++ b/blockchain/integrity/integrity_test.go @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) { // check history account's balance if statetx { - _, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a) - require.Equal(factory.ErrNotSupported, errors.Cause(err)) - _, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b) + _, err = sf.WorkingSetAtHeight(ctx, 0) require.Equal(factory.ErrNotSupported, errors.Cause(err)) } else { - AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a) + sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1) + require.NoError(err) + AccountA, err = accountutil.AccountState(ctx, sr, a) require.NoError(err) - AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b) + AccountB, err = accountutil.AccountState(ctx, sr, b) require.NoError(err) require.Equal(big.NewInt(100), AccountA.Balance) require.Equal(big.NewInt(100), AccountB.Balance) @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) { // check the the original balance again if statetx { - _, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr) - require.True(errors.Cause(err) == factory.ErrNotSupported) + _, err = sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1) + require.Equal(factory.ErrNotSupported, errors.Cause(err)) } else { - sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1) + sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1) + require.NoError(err) account, err = accountutil.AccountState(ctx, sr, contractAddr) require.NoError(err) balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root) diff --git a/state/factory/factory.go b/state/factory/factory.go index edd913e16d..2da378d74a 100644 --- a/state/factory/factory.go +++ b/state/factory/factory.go @@ -87,8 +87,6 @@ type ( NewBlockBuilder(context.Context, actpool.ActPool, func(action.Envelope) (*action.SealedEnvelope, error)) (*block.Builder, error) PutBlock(context.Context, *block.Block) error DeleteTipBlock(context.Context, *block.Block) error - StateAtHeight(uint64, interface{}, ...protocol.StateOption) error - StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error) WorkingSet(context.Context) (protocol.StateManager, error) WorkingSetAtHeight(context.Context, uint64) (protocol.StateManager, error) } @@ -272,6 +270,31 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe if err != nil { return nil, err } + return sf.createSfWorkingSet(ctx, height, store) +} + +func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) { + span := tracer.SpanFromContext(ctx) + span.AddEvent("factory.newWorkingSet") + defer span.End() + + g := genesis.MustExtractGenesisContext(ctx) + flusher, err := db.NewKVStoreFlusher( + sf.dao, + batch.NewCachedBatch(), + sf.flusherOptions(!g.IsEaster(height))..., + ) + if err != nil { + return nil, err + } + store, err := newFactoryWorkingSetStoreAtHeight(sf.protocolView, flusher, height) + if err != nil { + return nil, err + } + return sf.createSfWorkingSet(ctx, height, store) +} + +func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) { if err := store.Start(ctx); err != nil { return nil, err } @@ -286,7 +309,6 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe } } } - return newWorkingSet(height, store), nil } @@ -388,16 +410,20 @@ func (sf *factory) WorkingSet(ctx context.Context) (protocol.StateManager, error } func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) { + if !sf.saveHistory { + return nil, ErrNoArchiveData + } sf.mutex.Lock() defer sf.mutex.Unlock() - return sf.newWorkingSet(ctx, height+1) + if height > sf.currentChainHeight { + return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight) + } + return sf.newWorkingSetAtHeight(ctx, height) } // PutBlock persists all changes in RunActions() into the DB func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { - sf.mutex.Lock() timer := sf.timerFactory.NewTimer("Commit") - sf.mutex.Unlock() defer timer.End() producer := blk.PublicKey().Address() if producer == nil { @@ -456,52 +482,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error { return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory") } -// StateAtHeight returns a confirmed state at height -- archive mode -func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error { - sf.mutex.RLock() - defer sf.mutex.RUnlock() - cfg, err := processOptions(opts...) - if err != nil { - return err - } - if cfg.Keys != nil { - return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet") - } - if height > sf.currentChainHeight { - return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight) - } - return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s) -} - -// StatesAtHeight returns a set states in the state factory at height -- archive mode -func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) { - sf.mutex.RLock() - defer sf.mutex.RUnlock() - if height > sf.currentChainHeight { - return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight) - } - cfg, err := processOptions(opts...) - if err != nil { - return nil, err - } - if cfg.Keys != nil { - return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet") - } - tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false) - if err != nil { - return nil, errors.Wrapf(err, "failed to generate trie for %d", height) - } - if err := tlt.Start(context.Background()); err != nil { - return nil, err - } - defer tlt.Stop(context.Background()) - keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys) - if err != nil { - return nil, err - } - return state.NewIterator(keys, values) -} - // State returns a confirmed state in the state factory func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) { sf.mutex.RLock() @@ -573,26 +553,6 @@ func legacyKeyLen() int { return 20 } -func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error { - if !sf.saveHistory { - return ErrNoArchiveData - } - tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false) - if err != nil { - return errors.Wrapf(err, "failed to generate trie for %d", height) - } - if err := tlt.Start(context.Background()); err != nil { - return err - } - defer tlt.Stop(context.Background()) - - value, err := readStateFromTLT(tlt, ns, key) - if err != nil { - return err - } - return state.Deserialize(s, value) -} - func (sf *factory) createGenesisStates(ctx context.Context) error { ws, err := sf.newWorkingSet(ctx, 0) if err != nil { diff --git a/state/factory/factory_test.go b/state/factory/factory_test.go index 97ebf293fa..18888766e2 100644 --- a/state/factory/factory_test.go +++ b/state/factory/factory_test.go @@ -359,11 +359,11 @@ func TestState(t *testing.T) { func TestHistoryState(t *testing.T) { r := require.New(t) - var err error // using factory and enable history cfg := DefaultConfig - cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath) + file1, err := testutil.PathOfTempFile(_triePath) r.NoError(err) + cfg.Chain.TrieDBPath = file1 cfg.Chain.EnableArchiveMode = true db1, err := db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath) r.NoError(err) @@ -372,8 +372,9 @@ func TestHistoryState(t *testing.T) { testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode) // using stateDB and enable history - cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath) + file2, err := testutil.PathOfTempFile(_triePath) r.NoError(err) + cfg.Chain.TrieDBPath = file2 db2, err := db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize) r.NoError(err) sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption()) @@ -381,8 +382,9 @@ func TestHistoryState(t *testing.T) { testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode) // using factory and disable history - cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath) + file3, err := testutil.PathOfTempFile(_triePath) r.NoError(err) + cfg.Chain.TrieDBPath = file3 cfg.Chain.EnableArchiveMode = false db1, err = db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath) r.NoError(err) @@ -391,15 +393,19 @@ func TestHistoryState(t *testing.T) { testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode) // using stateDB and disable history - cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath) + file4, err := testutil.PathOfTempFile(_triePath) r.NoError(err) + cfg.Chain.TrieDBPath = file4 db2, err = db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize) r.NoError(err) sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption()) r.NoError(err) testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode) defer func() { - testutil.CleanupPath(cfg.Chain.TrieDBPath) + testutil.CleanupPath(file1) + testutil.CleanupPath(file2) + testutil.CleanupPath(file3) + testutil.CleanupPath(file4) }() } @@ -567,21 +573,24 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) { // check archive data if statetx { - // statetx not support archive mode - _, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a) - require.Equal(t, ErrNotSupported, errors.Cause(err)) - _, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b) + // statetx not support archive mode yet + _, err = sf.WorkingSetAtHeight(ctx, 0) require.Equal(t, ErrNotSupported, errors.Cause(err)) } else { + _, err = sf.WorkingSetAtHeight(ctx, 10) if !archive { - _, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a) require.Equal(t, ErrNoArchiveData, errors.Cause(err)) - _, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b) + } else { + require.Contains(t, err.Error(), "query height 10 is higher than tip height 1") + } + sr, err := sf.WorkingSetAtHeight(ctx, 0) + if !archive { require.Equal(t, ErrNoArchiveData, errors.Cause(err)) } else { - accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a) require.NoError(t, err) - accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b) + accountA, err = accountutil.AccountState(ctx, sr, a) + require.NoError(t, err) + accountB, err = accountutil.AccountState(ctx, sr, b) require.NoError(t, err) require.Equal(t, big.NewInt(100), accountA.Balance) require.Equal(t, big.NewInt(0), accountB.Balance) diff --git a/state/factory/historyfactory.go b/state/factory/historyfactory.go deleted file mode 100644 index d2e8c6ad26..0000000000 --- a/state/factory/historyfactory.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2020 IoTeX Foundation -// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability -// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. -// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. - -package factory - -import ( - "github.com/iotexproject/iotex-core/v2/action/protocol" - "github.com/iotexproject/iotex-core/v2/state" -) - -// historyStateReader implements state reader interface, wrap factory with archive height -type historyStateReader struct { - height uint64 - sf Factory -} - -// NewHistoryStateReader creates new history state reader by given state factory and height -func NewHistoryStateReader(sf Factory, h uint64) protocol.StateReader { - return &historyStateReader{ - sf: sf, - height: h, - } -} - -// Height returns archive height -func (hReader *historyStateReader) Height() (uint64, error) { - return hReader.height, nil -} - -// State returns history state in the archive mode state factory -func (hReader *historyStateReader) State(s interface{}, opts ...protocol.StateOption) (uint64, error) { - return hReader.height, hReader.sf.StateAtHeight(hReader.height, s, opts...) -} - -// States returns history states in the archive mode state factory -func (hReader *historyStateReader) States(opts ...protocol.StateOption) (uint64, state.Iterator, error) { - iterator, err := hReader.sf.StatesAtHeight(hReader.height, opts...) - if err != nil { - return 0, nil, err - } - return hReader.height, iterator, nil -} - -// ReadView reads the view -func (hReader *historyStateReader) ReadView(name string) (interface{}, error) { - return hReader.sf.ReadView(name) -} diff --git a/state/factory/statedb.go b/state/factory/statedb.go index 7da193a25f..ddfb2810d0 100644 --- a/state/factory/statedb.go +++ b/state/factory/statedb.go @@ -267,7 +267,8 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro } func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) { - return sdb.newWorkingSet(ctx, height+1) + // TODO: implement archive mode + return nil, ErrNotSupported } // PutBlock persists all changes in RunActions() into the DB diff --git a/state/factory/workingset.go b/state/factory/workingset.go index 5359bd4a67..91d46e95eb 100644 --- a/state/factory/workingset.go +++ b/state/factory/workingset.go @@ -343,6 +343,9 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64 if err != nil { return ws.height, err } + if cfg.Keys != nil { + return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet") + } value, err := ws.store.Get(cfg.Namespace, cfg.Key) if err != nil { return ws.height, err diff --git a/state/factory/workingset_test.go b/state/factory/workingset_test.go index fd1e5c1841..78f752ed19 100644 --- a/state/factory/workingset_test.go +++ b/state/factory/workingset_test.go @@ -57,7 +57,9 @@ func newFactoryWorkingSet(t testing.TB) *workingSet { genesis.Default, ) r.NoError(sf.Start(ctx)) - // defer r.NoError(sf.Stop(ctx)) + defer func() { + r.NoError(sf.Stop(ctx)) + }() ws, err := sf.(workingSetCreator).newWorkingSet(ctx, 1) r.NoError(err) diff --git a/state/factory/workingsetstore.go b/state/factory/workingsetstore.go index c357674be6..cec2d453e5 100644 --- a/state/factory/workingsetstore.go +++ b/state/factory/workingsetstore.go @@ -70,6 +70,21 @@ func newFactoryWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher) (w }, nil } +func newFactoryWorkingSetStoreAtHeight(view protocol.View, flusher db.KVStoreFlusher, height uint64) (workingSetStore, error) { + rootKey := fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height) + tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, flusher.KVStoreWithBuffer(), rootKey, false) + if err != nil { + return nil, err + } + + return &factoryWorkingSetStore{ + flusher: flusher, + view: view, + tlt: tlt, + trieRoots: make(map[int][]byte), + }, nil +} + func (store *stateDBWorkingSetStore) Start(context.Context) error { return nil } diff --git a/test/mock/mock_factory/mock_factory.go b/test/mock/mock_factory/mock_factory.go index 4e5f71671e..a18c978ca3 100644 --- a/test/mock/mock_factory/mock_factory.go +++ b/test/mock/mock_factory/mock_factory.go @@ -160,25 +160,6 @@ func (mr *MockFactoryMockRecorder) State(arg0 interface{}, arg1 ...interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockFactory)(nil).State), varargs...) } -// StateAtHeight mocks base method. -func (m *MockFactory) StateAtHeight(arg0 uint64, arg1 interface{}, arg2 ...protocol.StateOption) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "StateAtHeight", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// StateAtHeight indicates an expected call of StateAtHeight. -func (mr *MockFactoryMockRecorder) StateAtHeight(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAtHeight", reflect.TypeOf((*MockFactory)(nil).StateAtHeight), varargs...) -} - // States mocks base method. func (m *MockFactory) States(arg0 ...protocol.StateOption) (uint64, state.Iterator, error) { m.ctrl.T.Helper() @@ -199,26 +180,6 @@ func (mr *MockFactoryMockRecorder) States(arg0 ...interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "States", reflect.TypeOf((*MockFactory)(nil).States), arg0...) } -// StatesAtHeight mocks base method. -func (m *MockFactory) StatesAtHeight(arg0 uint64, arg1 ...protocol.StateOption) (state.Iterator, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "StatesAtHeight", varargs...) - ret0, _ := ret[0].(state.Iterator) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// StatesAtHeight indicates an expected call of StatesAtHeight. -func (mr *MockFactoryMockRecorder) StatesAtHeight(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatesAtHeight", reflect.TypeOf((*MockFactory)(nil).StatesAtHeight), varargs...) -} - // Stop mocks base method. func (m *MockFactory) Stop(arg0 context.Context) error { m.ctrl.T.Helper()