From 080828f778450b00519ab131d045ad6defc55ed0 Mon Sep 17 00:00:00 2001 From: 0xbundler <124862913+0xbundler@users.noreply.github.com> Date: Thu, 7 Dec 2023 22:25:26 +0800 Subject: [PATCH] freezer: support reset ancient head+tail; --- consensus/parlia/parlia.go | 8 +-- consensus/parlia/snapshot.go | 2 - core/blockchain_reader.go | 4 ++ core/genesis.go | 3 +- core/rawdb/database.go | 4 ++ core/rawdb/freezer.go | 29 ++++++++++ core/rawdb/freezer_resettable.go | 7 +++ core/rawdb/freezer_table.go | 51 +++++++++++++++++ core/rawdb/freezer_table_test.go | 97 ++++++++++++++++++++++++++++++++ core/rawdb/prunedfreezer.go | 4 ++ core/rawdb/table.go | 4 ++ eth/backend.go | 5 +- eth/downloader/downloader.go | 16 +++++- ethdb/database.go | 3 + ethdb/remotedb/remotedb.go | 4 ++ params/history_segment.go | 2 +- params/history_segment_test.go | 10 ++-- 17 files changed, 230 insertions(+), 23 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index eb8e3adb9a..4f72ece81d 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -11,7 +11,6 @@ import ( "math" "math/big" "math/rand" - "runtime/debug" "sort" "strings" "sync" @@ -417,7 +416,6 @@ func (p *Parlia) getParent(chain consensus.ChainHeaderReader, header *types.Head } if parent == nil || parent.Number.Uint64() != number-1 || parent.Hash() != header.ParentHash { - log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack())) return nil, consensus.ErrUnknownAncestor } return parent, nil @@ -540,7 +538,6 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H // Ensure that the extra-data contains a signer list on checkpoint, but none otherwise signersBytes := getValidatorBytesFromHeader(header, p.chainConfig, p.config) if !isEpoch && len(signersBytes) != 0 { - log.Error("validate header err", "number", header.Number, "hash", header.Hash(), "chainconfig", p.chainConfig, "config", p.config, "bytes", len(signersBytes)) return errExtraValidators } if isEpoch && len(signersBytes) == 0 { @@ -726,7 +723,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash // If we have explicit parents, pick from there (enforced) header = parents[len(parents)-1] if header.Hash() != hash || header.Number.Uint64() != number { - log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack())) return nil, consensus.ErrUnknownAncestor } parents = parents[:len(parents)-1] @@ -734,7 +730,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash // No explicit parents (or no more left), reach out to the database header = chain.GetHeader(hash, number) if header == nil { - log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack())) return nil, consensus.ErrUnknownAncestor } } @@ -794,7 +789,7 @@ func (p *Parlia) findSnapFromHistorySegment(hash common.Hash, number uint64) (*S tmp.config = p.config tmp.sigCache = p.signatures tmp.ethAPI = p.ethAPI - log.Info("found history segment snapshot", "number", number, "hash", hash, "segment", segment) + log.Debug("found history segment snapshot", "number", number, "hash", hash, "segment", segment) return &tmp, true } @@ -999,7 +994,6 @@ func (p *Parlia) Prepare(chain consensus.ChainHeaderReader, header *types.Header // Ensure the timestamp has the correct delay parent := chain.GetHeader(header.ParentHash, number-1) if parent == nil { - log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack())) return consensus.ErrUnknownAncestor } header.Time = p.blockTimeForRamanujanFork(snap, header, parent) diff --git a/consensus/parlia/snapshot.go b/consensus/parlia/snapshot.go index 0957b9eb55..ddfb1811fc 100644 --- a/consensus/parlia/snapshot.go +++ b/consensus/parlia/snapshot.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "runtime/debug" "sort" lru "github.com/hashicorp/golang-lru" @@ -272,7 +271,6 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea if number > 0 && number%s.config.Epoch == uint64(len(snap.Validators)/2) { checkpointHeader := FindAncientHeader(header, uint64(len(snap.Validators)/2), chain, parents) if checkpointHeader == nil { - log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack())) return nil, consensus.ErrUnknownAncestor } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index b3363a8982..5b9a16ef8e 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -470,3 +470,7 @@ func (bc *BlockChain) WriteCanonicalHeaders(headers []*types.Header, tds []uint6 } return nil } + +func (bc *BlockChain) FreezerDBReset(tail, head uint64) error { + return bc.db.AncientReset(tail, head) +} diff --git a/core/genesis.go b/core/genesis.go index ff6ca1820d..1929805da6 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -321,7 +321,7 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, triedb *trie.Database, gen log.Info("Writing default main-net genesis block") genesis = DefaultGenesisBlock() } else { - log.Info("Writing custom genesis block", "config", genesis.Config) + log.Info("Writing custom genesis block") } block, err := genesis.Commit(db, triedb) if err != nil { @@ -408,7 +408,6 @@ func LoadChainConfig(db ethdb.Database, genesis *Genesis) (*params.ChainConfig, if stored != (common.Hash{}) { storedcfg := rawdb.ReadChainConfig(db, stored) if storedcfg != nil { - log.Info("found chain config", "hash", stored, "cfg", storedcfg) return storedcfg, stored, nil } } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 6012341ca0..e0998ef753 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -200,6 +200,10 @@ func (db *nofreezedb) AncientDatadir() (string, error) { return "", errNotSupported } +func (db *nofreezedb) AncientReset(tail, head uint64) error { + return errNotSupported +} + // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 0a7bbe436a..a55e00d536 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -598,3 +598,32 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error { } return nil } + +func (f *Freezer) AncientReset(tail, head uint64) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + for i := range f.tables { + nt, err := f.tables[i].resetItems(tail, head) + if err != nil { + return err + } + f.tables[i] = nt + } + + if err := f.repair(); err != nil { + for _, table := range f.tables { + table.Close() + } + return err + } + + f.frozen.Add(f.offset) + f.tail.Add(f.offset) + f.writeBatch = newFreezerBatch(f) + log.Info("Ancient database reset", "tail", f.tail.Load(), "frozen", f.frozen.Load()) + return nil +} diff --git a/core/rawdb/freezer_resettable.go b/core/rawdb/freezer_resettable.go index 5e569f93d6..ac393bb07f 100644 --- a/core/rawdb/freezer_resettable.go +++ b/core/rawdb/freezer_resettable.go @@ -203,6 +203,13 @@ func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) { return f.freezer.TruncateTail(tail) } +func (f *ResettableFreezer) AncientReset(tail, head uint64) error { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.AncientReset(tail, head) +} + // Sync flushes all data tables to disk. func (f *ResettableFreezer) Sync() error { f.lock.RLock() diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 87b33240b8..4cea2357df 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -564,6 +564,57 @@ func (t *freezerTable) truncateTail(items uint64) error { return nil } +// resetItems reset freezer table head & tail +func (t *freezerTable) resetItems(tail, head uint64) (*freezerTable, error) { + if t.readonly { + return nil, errors.New("resetItems in readonly mode") + } + itemHidden := t.itemHidden.Load() + items := t.items.Load() + if tail != head && (itemHidden > tail || items < head) { + return nil, errors.New("cannot reset to non-exist range") + } + + var err error + if tail != head { + if err = t.truncateHead(head); err != nil { + return nil, err + } + if err = t.truncateTail(tail); err != nil { + return nil, err + } + return t, nil + } + + // if tail == head, it means table reset to 0 item + t.releaseFilesAfter(t.tailId-1, true) + t.head.Close() + os.Remove(t.head.Name()) + t.index.Close() + os.Remove(t.index.Name()) + t.meta.Close() + os.Remove(t.meta.Name()) + + var idxName string + if t.noCompression { + idxName = fmt.Sprintf("%s.ridx", t.name) // raw index file + } else { + idxName = fmt.Sprintf("%s.cidx", t.name) // compressed index file + } + index, err := openFreezerFileForAppend(filepath.Join(t.path, idxName)) + if err != nil { + return nil, err + } + tailIndex := indexEntry{ + offset: uint32(tail), + } + if _, err = index.Write(tailIndex.append(nil)); err != nil { + return nil, err + } + + return newFreezerTable(t.path, t.name, t.noCompression, t.readonly) +} + // Close closes all opened files. func (t *freezerTable) Close() error { t.lock.Lock() diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index f2d3773dd9..a6fd1861ab 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -27,6 +27,8 @@ import ( "testing" "testing/quick" + "github.com/stretchr/testify/assert" + "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/metrics" "github.com/stretchr/testify/require" @@ -1337,3 +1339,98 @@ func TestRandom(t *testing.T) { t.Fatal(err) } } + +func TestResetItems(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch(0) + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + // nothing to do, all the items should still be there. + f, err = f.resetItems(0, 7) + assert.NoError(t, err) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + f, err = f.resetItems(1, 5) + assert.NoError(t, err) + _, err = f.resetItems(0, 5) + assert.Error(t, err) + _, err = f.resetItems(1, 6) + assert.Error(t, err) + + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + }) + + f, err = f.resetItems(4, 4) + assert.NoError(t, err) + checkRetrieveError(t, f, map[uint64]error{ + 4: errOutOfBounds, + }) + + batch = f.newBatch(0) + require.Error(t, batch.AppendRaw(0, getChunk(20, 0xa0))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xa4))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xa5))) + require.NoError(t, batch.commit()) + fmt.Println(f.dumpIndexString(0, 1000)) + + // Reopen the table, the deletion information should be persisted as well + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xa4), + 5: getChunk(20, 0xa5), + }) + + // truncate all, the entire freezer should be deleted + f.truncateTail(6) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + 5: errOutOfBounds, + 6: errOutOfBounds, + }) +} diff --git a/core/rawdb/prunedfreezer.go b/core/rawdb/prunedfreezer.go index ffc6647816..9f3541bc6f 100644 --- a/core/rawdb/prunedfreezer.go +++ b/core/rawdb/prunedfreezer.go @@ -188,6 +188,10 @@ func (f *prunedfreezer) TruncateTail(tail uint64) (uint64, error) { return 0, errNotSupported } +func (f *prunedfreezer) AncientReset(tail, head uint64) error { + return errNotSupported +} + // Sync flushes meta data tables to disk. func (f *prunedfreezer) Sync() error { WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen)) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 23730aeb6a..79ab96b728 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -117,6 +117,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) { return t.db.TruncateTail(items) } +func (t *table) AncientReset(tail, head uint64) error { + return t.db.AncientReset(tail, head) +} + // Sync is a noop passthrough that just forwards the request to the underlying // database. func (t *table) Sync() error { diff --git a/eth/backend.go b/eth/backend.go index fbaa92e14c..11c5666d1e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -202,7 +202,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } if p, ok := eth.engine.(consensus.PoSA); ok { - log.Info("setup consensus engine history segment", "lastSegment", lastSegment) p.SetupHistorySegment(hsm) } } @@ -278,7 +277,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } bc.SetupHistorySegment(hsm) - log.Info("setup blockchain history segment", "lastSegment", lastSegment) } return bc, nil }) @@ -418,7 +416,8 @@ func GetHistorySegmentAndLastSegment(db ethdb.Database, genesisHash common.Hash, // check segment if match hard code if err = rawdb.AvailableHistorySegment(db, lastSegment); err != nil { - return nil, nil, err + log.Warn("there is no available history to prune", "head", latestHeader.Number) + return hsm, nil, nil } return hsm, lastSegment, nil } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index cf81369419..370441eb46 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -214,8 +214,11 @@ type BlockChain interface { // LastHistorySegment get last history segment LastHistorySegment(num uint64) *params.HistorySegment - // WriteHeaders just write header into db, it an unsafe interface, just for history segment + // WriteCanonicalHeaders just write header into db, it an unsafe interface, just for history segment WriteCanonicalHeaders([]*types.Header, []uint64) error + + // FreezerDBReset reset freezer db to target tail & head + FreezerDBReset(tail, head uint64) error } type DownloadOption func(downloader *Downloader) *Downloader @@ -516,7 +519,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if err != nil { return err } - log.Info("sync from peer", "local", localHeight, "remote", remoteHeight, "origin", origin, "peer", p.peer) + log.Debug("sync from peer", "local", localHeight, "remote", remoteHeight, "origin", origin, "peer", p.peer) if localHeight >= remoteHeight { // if remoteHeader does not exist in local chain, will move on to insert it as a side chain. @@ -592,6 +595,13 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * return err } } + + // if enable history segment, force reset freezer tail + if d.lastSegment != nil && localHeight == 0 { + if err := d.blockchain.FreezerDBReset(origin, origin); err != nil { + return err + } + } } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, mode) @@ -1778,7 +1788,7 @@ func (d *Downloader) findAncestorFromHistorySegment(p *peerConnection, remoteHei if err = d.blockchain.WriteCanonicalHeaders(headers, []uint64{d.lastSegment.TD}); err != nil { return 0, err } - log.Info("sync history segment header to local", "number", n, "hash", h, "segment", d.lastSegment) + log.Debug("sync history segment header to local", "number", n, "hash", h, "segment", d.lastSegment) } return n, nil } diff --git a/ethdb/database.go b/ethdb/database.go index 5af19e3478..83c4ae905e 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -136,6 +136,9 @@ type AncientWriter interface { // The second argument is a function that takes a raw entry and returns it // in the newest format. MigrateTable(string, func([]byte) ([]byte, error)) error + + // AncientReset reset ancient items + AncientReset(tail, head uint64) error } // AncientWriteOp is given to the function argument of ModifyAncients. diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index babb625d88..a13a0bc901 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -122,6 +122,10 @@ func (db *Database) TruncateTail(n uint64) (uint64, error) { panic("not supported") } +func (db *Database) AncientReset(tail, head uint64) error { + panic("not supported") +} + func (db *Database) Sync() error { return nil } diff --git a/params/history_segment.go b/params/history_segment.go index a317826cf9..bc51998cbc 100644 --- a/params/history_segment.go +++ b/params/history_segment.go @@ -43,7 +43,7 @@ type HistorySegment struct { } func (s *HistorySegment) String() string { - return fmt.Sprintf("[Index: %v, ReGenesisNumber: %v, ReGenesisHash: %v, TD: %v]", s.Index, s.ReGenesisNumber, s.ReGenesisNumber, s.TD) + return fmt.Sprintf("{Index: %v, ReGenesisNumber: %v, ReGenesisHash: %v, TD: %v}", s.Index, s.ReGenesisNumber, s.ReGenesisHash, s.TD) } func (s *HistorySegment) MatchBlock(h common.Hash, n uint64) bool { diff --git a/params/history_segment_test.go b/params/history_segment_test.go index bddd16373c..d1be7d3904 100644 --- a/params/history_segment_test.go +++ b/params/history_segment_test.go @@ -165,18 +165,18 @@ func TestIndexSegment(t *testing.T) { assert.Equal(t, segments[2], hsm.CurSegment(BoundStartBlock+HistorySegmentLength*2)) var ( - prev HistorySegment + prev *HistorySegment ok bool ) - _, ok = hsm.LastSegment(segments[0]) + _, ok = hsm.LastSegment(&segments[0]) assert.Equal(t, false, ok) - prev, ok = hsm.LastSegment(segments[1]) + prev, ok = hsm.LastSegment(&segments[1]) assert.Equal(t, true, ok) assert.Equal(t, segments[0], prev) - prev, ok = hsm.LastSegment(segments[2]) + prev, ok = hsm.LastSegment(&segments[2]) assert.Equal(t, true, ok) assert.Equal(t, segments[1], prev) - _, ok = hsm.LastSegment(HistorySegment{ + _, ok = hsm.LastSegment(&HistorySegment{ Index: uint64(len(segments)), }) assert.Equal(t, false, ok)