Skip to content

Commit

Permalink
[factory] simplify interface for archive mode (#4474)
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie authored Nov 25, 2024
1 parent 60c2a3a commit be618b8
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 186 deletions.
8 changes: 6 additions & 2 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
if height != "" {
inputHeight, err := strconv.ParseUint(height, 0, 64)
if err != nil {
return nil, uint64(0), err
return nil, 0, err
}
rp := rolldpos.FindProtocol(core.registry)
if rp != nil {
Expand All @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
}
if inputHeight < tipHeight {
// old data, wrap to history state reader
d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...)
historySR, err := core.sf.WorkingSetAtHeight(ctx, inputHeight)
if err != nil {
return nil, 0, err
}
d, h, err := p.ReadState(ctx, historySR, methodName, arguments...)
if err == nil {
key.Height = strconv.FormatUint(h, 10)
core.readCache.Put(key.Hash(), d)
Expand Down
1 change: 1 addition & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error {

func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) {
cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes())
cfg.chain.EnableArchiveMode = true
registry := protocol.NewRegistry()
factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis)
sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry))
Expand Down
17 changes: 9 additions & 8 deletions blockchain/integrity/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) {

// check history account's balance
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
AccountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(err)
AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
AccountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(err)
require.Equal(big.NewInt(100), AccountA.Balance)
require.Equal(big.NewInt(100), AccountB.Balance)
Expand Down Expand Up @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) {

// check the the original balance again
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr)
require.True(errors.Cause(err) == factory.ErrNotSupported)
_, err = sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
account, err = accountutil.AccountState(ctx, sr, contractAddr)
require.NoError(err)
balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root)
Expand Down
104 changes: 32 additions & 72 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type (
NewBlockBuilder(context.Context, actpool.ActPool, func(action.Envelope) (*action.SealedEnvelope, error)) (*block.Builder, error)
PutBlock(context.Context, *block.Block) error
DeleteTipBlock(context.Context, *block.Block) error
StateAtHeight(uint64, interface{}, ...protocol.StateOption) error
StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error)
WorkingSet(context.Context) (protocol.StateManager, error)
WorkingSetAtHeight(context.Context, uint64) (protocol.StateManager, error)
}
Expand Down Expand Up @@ -272,6 +270,31 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
}

func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) {
span := tracer.SpanFromContext(ctx)
span.AddEvent("factory.newWorkingSet")
defer span.End()

g := genesis.MustExtractGenesisContext(ctx)
flusher, err := db.NewKVStoreFlusher(
sf.dao,
batch.NewCachedBatch(),
sf.flusherOptions(!g.IsEaster(height))...,
)
if err != nil {
return nil, err
}
store, err := newFactoryWorkingSetStoreAtHeight(sf.protocolView, flusher, height)
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
}

func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) {
if err := store.Start(ctx); err != nil {
return nil, err
}
Expand All @@ -286,7 +309,6 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
}
}
}

return newWorkingSet(height, store), nil
}

Expand Down Expand Up @@ -388,16 +410,20 @@ func (sf *factory) WorkingSet(ctx context.Context) (protocol.StateManager, error
}

func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
if !sf.saveHistory {
return nil, ErrNoArchiveData
}
sf.mutex.Lock()
defer sf.mutex.Unlock()
return sf.newWorkingSet(ctx, height+1)
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.newWorkingSetAtHeight(ctx, height)
}

// PutBlock persists all changes in RunActions() into the DB
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
sf.mutex.Lock()
timer := sf.timerFactory.NewTimer("Commit")
sf.mutex.Unlock()
defer timer.End()
producer := blk.PublicKey().Address()
if producer == nil {
Expand Down Expand Up @@ -456,52 +482,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory")
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return err
}
if cfg.Keys != nil {
return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if height > sf.currentChainHeight {
return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s)
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return nil, err
}
if cfg.Keys != nil {
return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return nil, err
}
return state.NewIterator(keys, values)
}

// State returns a confirmed state in the state factory
func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
Expand Down Expand Up @@ -573,26 +553,6 @@ func legacyKeyLen() int {
return 20
}

func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factory) createGenesisStates(ctx context.Context) error {
ws, err := sf.newWorkingSet(ctx, 0)
if err != nil {
Expand Down
37 changes: 23 additions & 14 deletions state/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ func TestState(t *testing.T) {

func TestHistoryState(t *testing.T) {
r := require.New(t)
var err error
// using factory and enable history
cfg := DefaultConfig
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file1, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file1
cfg.Chain.EnableArchiveMode = true
db1, err := db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath)
r.NoError(err)
Expand All @@ -372,17 +372,19 @@ func TestHistoryState(t *testing.T) {
testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode)

// using stateDB and enable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file2, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file2
db2, err := db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize)
r.NoError(err)
sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption())
r.NoError(err)
testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode)

// using factory and disable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file3, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file3
cfg.Chain.EnableArchiveMode = false
db1, err = db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath)
r.NoError(err)
Expand All @@ -391,15 +393,19 @@ func TestHistoryState(t *testing.T) {
testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode)

// using stateDB and disable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file4, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file4
db2, err = db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize)
r.NoError(err)
sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption())
r.NoError(err)
testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode)
defer func() {
testutil.CleanupPath(cfg.Chain.TrieDBPath)
testutil.CleanupPath(file1)
testutil.CleanupPath(file2)
testutil.CleanupPath(file3)
testutil.CleanupPath(file4)
}()
}

Expand Down Expand Up @@ -567,21 +573,24 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) {

// check archive data
if statetx {
// statetx not support archive mode
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
// statetx not support archive mode yet
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(t, ErrNotSupported, errors.Cause(err))
} else {
_, err = sf.WorkingSetAtHeight(ctx, 10)
if !archive {
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
} else {
require.Contains(t, err.Error(), "query height 10 is higher than tip height 1")
}
sr, err := sf.WorkingSetAtHeight(ctx, 0)
if !archive {
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
} else {
accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
accountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(t, err)
require.Equal(t, big.NewInt(100), accountA.Balance)
require.Equal(t, big.NewInt(0), accountB.Balance)
Expand Down
49 changes: 0 additions & 49 deletions state/factory/historyfactory.go

This file was deleted.

3 changes: 2 additions & 1 deletion state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
}

func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
return sdb.newWorkingSet(ctx, height+1)
// TODO: implement archive mode
return nil, ErrNotSupported
}

// PutBlock persists all changes in RunActions() into the DB
Expand Down
3 changes: 3 additions & 0 deletions state/factory/workingset.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64
if err != nil {
return ws.height, err
}
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
value, err := ws.store.Get(cfg.Namespace, cfg.Key)
if err != nil {
return ws.height, err
Expand Down
4 changes: 3 additions & 1 deletion state/factory/workingset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func newFactoryWorkingSet(t testing.TB) *workingSet {
genesis.Default,
)
r.NoError(sf.Start(ctx))
// defer r.NoError(sf.Stop(ctx))
defer func() {
r.NoError(sf.Stop(ctx))
}()

ws, err := sf.(workingSetCreator).newWorkingSet(ctx, 1)
r.NoError(err)
Expand Down
Loading

0 comments on commit be618b8

Please sign in to comment.