Skip to content

Commit

Permalink
fix DeleteVersions stray orphans, and add DeleteVersionsRange (cosmos…
Browse files Browse the repository at this point in the history
  • Loading branch information
Klimov Sergey authored and erikgrinaker committed Nov 12, 2020
1 parent 2bc72d8 commit 078f48a
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 26 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

### Bug Fixes

- [\#324](https://github.com/cosmos/iavl/pull/324) Fix `DeleteVersions` not properly removing
orphans, and add `DeleteVersionsRange` to delete a range.

## 0.15.0-rc4 (October 14, 2020)

### Breaking Changes
Expand Down
38 changes: 33 additions & 5 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,23 +540,51 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
tree.ndb.opts.InitialVersion = version
}

// DeleteVersions deletes a series of versions from the MutableTree. An error
// is returned if any single version is invalid or the delete fails. All writes
// happen in a single batch with a single commit.
// DeleteVersions deletes a series of versions from the MutableTree.
// Deprecated: please use DeleteVersionsRange instead.
func (tree *MutableTree) DeleteVersions(versions ...int64) error {
debug("DELETING VERSIONS: %v\n", versions)

if len(versions) == 0 {
return nil
}

sort.Slice(versions, func(i, j int) bool {
return versions[i] < versions[j]
})

// Find ordered data and delete by interval
intervals := map[int64]int64{}
var fromVersion int64
for _, version := range versions {
if err := tree.deleteVersion(version); err != nil {
if version-fromVersion != intervals[fromVersion] {
fromVersion = version
}
intervals[fromVersion]++
}

for fromVersion, sortedBatchSize := range intervals {
if err := tree.DeleteVersionsRange(fromVersion, fromVersion+sortedBatchSize); err != nil {
return err
}
}

return nil
}

// DeleteVersionsRange removes versions from an interval from the MutableTree (not inclusive).
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error {
if err := tree.ndb.DeleteVersionsRange(fromVersion, toVersion); err != nil {
return err
}

if err := tree.ndb.Commit(); err != nil {
return err
}

for _, version := range versions {
for version := fromVersion; version < toVersion; version++ {
delete(tree.versions, version)
}

Expand Down
78 changes: 78 additions & 0 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"runtime"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -140,6 +141,83 @@ func TestMutableTree_LazyLoadVersion_Empty(t *testing.T) {
require.Error(t, err)
}

func TestMutableTree_DeleteVersionsRange(t *testing.T) {
require := require.New(t)

mdb := db.NewMemDB()
tree, err := NewMutableTree(mdb, 0)
require.NoError(err)

const maxLength = 100
const fromLength = 10

versions := make([]int64, 0, maxLength)
for count := 1; count <= maxLength; count++ {
versions = append(versions, int64(count))
countStr := strconv.Itoa(count)
// Set kv pair and save version
tree.Set([]byte("aaa"), []byte("bbb"))
tree.Set([]byte("key"+countStr), []byte("value"+countStr))
_, _, err = tree.SaveVersion()
require.NoError(err, "SaveVersion should not fail")
}

tree, err = NewMutableTree(mdb, 0)
require.NoError(err)
targetVersion, err := tree.LoadVersion(int64(maxLength))
require.NoError(err)
require.Equal(targetVersion, int64(maxLength), "targetVersion shouldn't larger than the actual tree latest version")

err = tree.DeleteVersionsRange(fromLength, int64(maxLength/2))
require.NoError(err, "DeleteVersionsTo should not fail")

for _, version := range versions[:fromLength-1] {
require.True(tree.versions[version], "versions %d no more than 10 should exist", version)

v, err := tree.LazyLoadVersion(version)
require.NoError(err, version)
require.Equal(v, version)

_, value := tree.Get([]byte("aaa"))
require.Equal(string(value), "bbb")

for _, count := range versions[:version] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
}

for _, version := range versions[fromLength : int64(maxLength/2)-1] {
require.False(tree.versions[version], "versions %d more 10 and no more than 50 should have been deleted", version)

_, err := tree.LazyLoadVersion(version)
require.Error(err)
}

for _, version := range versions[int64(maxLength/2)-1:] {
require.True(tree.versions[version], "versions %d more than 50 should exist", version)

v, err := tree.LazyLoadVersion(version)
require.NoError(err)
require.Equal(v, version)

_, value := tree.Get([]byte("aaa"))
require.Equal(string(value), "bbb")

for _, count := range versions[:fromLength] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
for _, count := range versions[int64(maxLength/2)-1 : version] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
}
}

func TestMutableTree_InitialVersion(t *testing.T) {
memDB := db.NewMemDB()
tree, err := NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9})
Expand Down
55 changes: 55 additions & 0 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,61 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
return nil
}

// DeleteVersionsRange deletes versions from an interval (not inclusive).
func (ndb *nodeDB) DeleteVersionsRange(fromVersion, toVersion int64) error {
if fromVersion >= toVersion {
return errors.New("toVersion must be greater than fromVersion")
}
if toVersion == 0 {
return errors.New("toVersion must be greater than 0")
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

latest := ndb.getLatestVersion()
if latest < toVersion {
return errors.Errorf("cannot delete latest saved version (%d)", latest)
}

predecessor := ndb.getPreviousVersion(fromVersion)

for v, r := range ndb.versionReaders {
if v < toVersion && v > predecessor && r != 0 {
return errors.Errorf("unable to delete version %v with %v active readers", v, r)
}
}

// If the predecessor is earlier than the beginning of the lifetime, we can delete the orphan.
// Otherwise, we shorten its lifetime, by moving its endpoint to the predecessor version.
for version := fromVersion; version < toVersion; version++ {
ndb.traverseOrphansVersion(version, func(key, hash []byte) {
var from, to int64
orphanKeyFormat.Scan(key, &to, &from)
if err := ndb.batch.Delete(key); err != nil {
panic(err)
}
if from > predecessor {
if err := ndb.batch.Delete(ndb.nodeKey(hash)); err != nil {
panic(err)
}
ndb.uncacheNode(hash)
} else {
ndb.saveOrphan(hash, from, predecessor)
}
})
}

// Delete the version root entries
ndb.traverseRange(rootKeyFormat.Key(fromVersion), rootKeyFormat.Key(toVersion), func(k, v []byte) {
if err := ndb.batch.Delete(k); err != nil {
panic(err)
}
})

return nil
}

// deleteNodesFrom deletes the given node and any descendants that have versions after the given
// (inclusive). It is mainly used via LoadVersionForOverwriting, to delete the current version.
func (ndb *nodeDB) deleteNodesFrom(version int64, hash []byte) error {
Expand Down
71 changes: 50 additions & 21 deletions tree_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func testRandomOperations(t *testing.T, randSeed int64) {
keySize = 16 // before base64-encoding
valueSize = 16 // before base64-encoding

versions = 32 // number of final versions to generate
reloadChance = 0.1 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.05 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.2 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)
versions = 32 // number of final versions to generate
reloadChance = 0.1 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.05 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.2 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)
deleteRangeChance = 0.5 // chance deletion versions in range
deleteRangeMaxBatch = 5 // small range to delete

versionOps = 64 // number of operations (create/update/delete) per version
updateRatio = 0.4 // ratio of updates out of all operations
Expand Down Expand Up @@ -148,12 +150,30 @@ func testRandomOperations(t *testing.T, randSeed int64) {
if r.Float64() < deleteChance {
versions := getMirrorVersions(diskMirrors, memMirrors)
if len(versions) > 2 {
deleteVersion := int64(versions[r.Intn(len(versions)-1)])
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
delete(diskMirrors, deleteVersion)
delete(memMirrors, deleteVersion)
if r.Float64() < deleteRangeChance {
indexFrom := r.Intn(len(versions) - 1)
from := versions[indexFrom]
batch := r.Intn(deleteRangeMaxBatch)
if batch > len(versions[indexFrom:])-2 {
batch = len(versions[indexFrom:]) - 2
}
to := versions[indexFrom+batch] + 1
t.Logf("Deleting versions %v-%v", from, to-1)
err = tree.DeleteVersionsRange(int64(from), int64(to))
require.NoError(t, err)
for version := from; version < to; version++ {
delete(diskMirrors, int64(version))
delete(memMirrors, int64(version))
}
} else {
i := r.Intn(len(versions) - 1)
deleteVersion := int64(versions[i])
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
delete(diskMirrors, deleteVersion)
delete(memMirrors, deleteVersion)
}
}
}

Expand Down Expand Up @@ -211,14 +231,23 @@ func testRandomOperations(t *testing.T, randSeed int64) {
// Once we're done, delete all prior versions in random order, make sure all orphans have been
// removed, and check that the latest versions matches the mirror.
remaining := tree.AvailableVersions()
remaining = remaining[:len(remaining)-1]
for len(remaining) > 0 {
i := r.Intn(len(remaining))
deleteVersion := int64(remaining[i])
remaining = append(remaining[:i], remaining[i+1:]...)
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)

if r.Float64() < deleteRangeChance {
if len(remaining) > 0 {
t.Logf("Deleting versions %v-%v", remaining[0], remaining[len(remaining)-2])
err = tree.DeleteVersionsRange(int64(remaining[0]), int64(remaining[len(remaining)-1]))
require.NoError(t, err)
}
} else {
remaining = remaining[:len(remaining)-1]
for len(remaining) > 0 {
i := r.Intn(len(remaining))
deleteVersion := int64(remaining[i])
remaining = append(remaining[:i], remaining[i+1:]...)
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
}
}
require.EqualValues(t, []int{int(version)}, tree.AvailableVersions())

Expand Down
Loading

0 comments on commit 078f48a

Please sign in to comment.