Skip to content

Commit

Permalink
chore: sync missing features from v1.2.x to the default branch (cosmo…
Browse files Browse the repository at this point in the history
…s#969)

Co-authored-by: Marko <[email protected]>
  • Loading branch information
cool-develope and tac0turtle authored Jul 26, 2024
1 parent 558a18c commit d561baf
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [#952](https://github.com/cosmos/iavl/pull/952) Add `DeleteVersionsFrom(int64)` API.
- [#961](https://github.com/cosmos/iavl/pull/961) Add new `GetLatestVersion` API to get the latest version.
- [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`.

## v1.2.0 May 13, 2024

Expand Down
13 changes: 0 additions & 13 deletions migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/exec"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -254,18 +253,6 @@ func TestPruning(t *testing.T) {
}
}

// Wait for pruning to finish
for i := 0; i < 100; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion))
require.NoError(t, err)
if !isLeacy {
break
}
// Simulate the consensus state update
time.Sleep(500 * time.Millisecond)
}
// Reload the tree
tree = NewMutableTree(db, 0, false, NewNopLogger())
versions := tree.AvailableVersions()
Expand Down
16 changes: 12 additions & 4 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,17 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error)
return nil, nil
}

// SetCommitting sets a flag to indicate that the tree is in the process of being saved.
// This is used to prevent parallel writing from async pruning.
func (tree *MutableTree) SetCommitting() {
tree.ndb.SetCommitting()
}

// UnsetCommitting unsets the flag to indicate that the tree is no longer in the process of being saved.
func (tree *MutableTree) UnsetCommitting() {
tree.ndb.UnsetCommitting()
}

// SaveVersion saves a new tree version to disk, based on the current state of
// the tree. Returns the hash and new version number.
func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
Expand Down Expand Up @@ -1025,10 +1036,7 @@ func (tree *MutableTree) saveNewNodes(version int64) error {
var recursiveAssignKey func(*Node) ([]byte, error)
recursiveAssignKey = func(node *Node) ([]byte, error) {
if node.nodeKey != nil {
if node.nodeKey.nonce != 0 {
return node.nodeKey.GetKey(), nil
}
return node.hash, nil
return node.GetKey(), nil
}
nonce++
node.nodeKey = &NodeKey{
Expand Down
160 changes: 124 additions & 36 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ const (
defaultStorageVersionValue = "1.0.0"
fastStorageVersionValue = "1.1.0"
fastNodeCacheSize = 100000

// This is used to avoid the case which pruning blocks the main process.
deleteBatchCount = 1000
deletePauseDuration = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -86,9 +82,12 @@ type nodeDB struct {
storageVersion string // Storage version
firstVersion int64 // First version of nodeDB.
latestVersion int64 // Latest version of nodeDB.
pruneVersion int64 // Version to prune up to.
legacyLatestVersion int64 // Latest version of nodeDB in legacy format.
nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version.
fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version.
isCommitting bool // Flag to indicate that the nodeDB is committing.
chCommitting chan struct{} // Channel to signal that the committing is done.
}

func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
Expand All @@ -98,19 +97,27 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
storeVersion = []byte(defaultStorageVersionValue)
}

return &nodeDB{
ndb := &nodeDB{
logger: lg,
db: db,
batch: NewBatchWithFlusher(db, opts.FlushThreshold),
opts: opts,
firstVersion: 0,
latestVersion: 0, // initially invalid
legacyLatestVersion: 0,
pruneVersion: 0,
nodeCache: cache.New(cacheSize),
fastNodeCache: cache.New(fastNodeCacheSize),
versionReaders: make(map[int64]uint32, 8),
storageVersion: string(storeVersion),
chCommitting: make(chan struct{}, 1),
}

if opts.AsyncPruning {
go ndb.startPruning()
}

return ndb
}

// GetNode gets a node from memory or disk. If it is an inner node, it does not
Expand Down Expand Up @@ -243,6 +250,33 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error {
return ndb.saveFastNodeUnlocked(node, false)
}

// SetCommitting sets the committing flag to true.
// This is used to let the pruning process know that the nodeDB is committing.
func (ndb *nodeDB) SetCommitting() {
for len(ndb.chCommitting) > 0 {
<-ndb.chCommitting
}
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.isCommitting = true
}

// UnsetCommitting sets the committing flag to false.
// This is used to let the pruning process know that the nodeDB is done committing.
func (ndb *nodeDB) UnsetCommitting() {
ndb.mtx.Lock()
ndb.isCommitting = false
ndb.mtx.Unlock()
ndb.chCommitting <- struct{}{}
}

// IsCommitting returns true if the nodeDB is committing, false otherwise.
func (ndb *nodeDB) IsCommitting() bool {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.isCommitting
}

// SetFastStorageVersionToBatch sets storage version to fast where the version is
// 1.1.0-<version of the current live state>. Returns error if storage version is incorrect or on
// db error, nil otherwise. Requires changes to be committed after to be persisted.
Expand Down Expand Up @@ -330,6 +364,37 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) {
return ndb.db.Has(ndb.nodeKey(nk))
}

// deleteFromPruning deletes the orphan nodes from the pruning process.
func (ndb *nodeDB) deleteFromPruning(key []byte) error {
if ndb.IsCommitting() {
// if the nodeDB is committing, the pruning process will be done after the committing.
<-ndb.chCommitting
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.batch.Delete(key)
}

// saveNodeFromPruning saves the orphan nodes to the pruning process.
func (ndb *nodeDB) saveNodeFromPruning(node *Node) error {
if ndb.IsCommitting() {
// if the nodeDB is committing, the pruning process will be done after the committing.
<-ndb.chCommitting
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

// Save node bytes to db.
var buf bytes.Buffer
buf.Grow(node.encodedSize())
if err := node.writeBytes(&buf); err != nil {
return err
}
return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes())
}

// deleteVersion deletes a tree version from disk.
// deletes orphans
func (ndb *nodeDB) deleteVersion(version int64) error {
Expand All @@ -342,7 +407,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
if orphan.nodeKey.nonce == 0 && !orphan.isLegacy {
// if the orphan is a reformatted root, it can be a legacy root
// so it should be removed from the pruning process.
if err := ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)); err != nil {
if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil {
return err
}
}
Expand All @@ -354,9 +419,9 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
}
nk := orphan.GetKey()
if orphan.isLegacy {
return ndb.batch.Delete(ndb.legacyNodeKey(nk))
return ndb.deleteFromPruning(ndb.legacyNodeKey(nk))
}
return ndb.batch.Delete(ndb.nodeKey(nk))
return ndb.deleteFromPruning(ndb.nodeKey(nk))
}); err != nil {
return err
}
Expand All @@ -365,7 +430,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) {
// if the root key is not matched with the literal root key, it means the given root
// is a reference root to the previous version.
if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil {
if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil {
return err
}
}
Expand All @@ -381,12 +446,12 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
return err
}
// ensure that the given version is not included in the root search
if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil {
if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil {
return err
}
// instead, the root should be reformatted to (version, 0)
root.nodeKey.nonce = 0
if err := ndb.SaveNode(root); err != nil {
if err := ndb.saveNodeFromPruning(root); err != nil {
return err
}
}
Expand Down Expand Up @@ -420,36 +485,30 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error {

// deleteLegacyVersions deletes all legacy versions from disk.
func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error {
count := 0

checkDeletePause := func() {
count++
if count%deleteBatchCount == 0 {
time.Sleep(deletePauseDuration)
count = 0
}
// Delete the last version for the legacyLastVersion
if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error {
return ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash))
}); err != nil {
return err
}

// Delete orphans for all legacy versions
if err := ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error {
checkDeletePause()
if err := ndb.batch.Delete(key); err != nil {
if err := ndb.deleteFromPruning(key); err != nil {
return err
}
var fromVersion, toVersion int64
legacyOrphanKeyFormat.Scan(key, &toVersion, &fromVersion)
if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion) || fromVersion > legacyLatestVersion {
checkDeletePause()
return ndb.batch.Delete(ndb.legacyNodeKey(value))
return ndb.deleteFromPruning(ndb.legacyNodeKey(value))
}
return nil
}); err != nil {
return err
}
// Delete all legacy roots
if err := ndb.traversePrefix(legacyRootKeyFormat.Key(), func(key, _ []byte) error {
checkDeletePause()
return ndb.batch.Delete(key)
return ndb.deleteFromPruning(key)
}); err != nil {
return err
}
Expand Down Expand Up @@ -515,8 +574,45 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error {
return nil
}

// startPruning starts the pruning process.
func (ndb *nodeDB) startPruning() {
for {
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()

if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}

if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}

ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
}
ndb.mtx.Unlock()
}
}

// DeleteVersionsTo deletes the oldest versions up to the given version from disk.
func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error {
if !ndb.opts.AsyncPruning {
return ndb.deleteVersionsTo(toVersion)
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.pruneVersion = toVersion
return nil
}

func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error {
legacyLatestVersion, err := ndb.getLegacyLatestVersion()
if err != nil {
return err
Expand Down Expand Up @@ -553,20 +649,12 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error {

// Delete the legacy versions
if legacyLatestVersion >= first {
// Delete the last version for the legacyLastVersion
if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error {
return ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash))
}); err != nil {
return err
if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil {
ndb.logger.Error("Error deleting legacy versions", "err", err)
}
first = legacyLatestVersion + 1
// reset the legacy latest version forcibly to avoid multiple calls
ndb.resetLegacyLatestVersion(-1)
go func() {
if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil {
ndb.logger.Error("Error deleting legacy versions", "err", err)
}
}()
first = legacyLatestVersion + 1
}

for version := first; version <= toVersion; version++ {
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Options struct {

// Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115
FlushThreshold int

// AsyncPruning is a flag to enable async pruning
AsyncPruning bool
}

// DefaultOptions returns the default options for IAVL.
Expand Down Expand Up @@ -118,3 +121,10 @@ func FlushThresholdOption(ft int) Option {
opts.FlushThreshold = ft
}
}

// AsyncPruningOption sets the AsyncPruning for the tree.
func AsyncPruningOption(asyncPruning bool) Option {
return func(opts *Options) {
opts.AsyncPruning = asyncPruning
}
}
Loading

0 comments on commit d561baf

Please sign in to comment.