Skip to content

Commit

Permalink
fix DeleteVersions stray orphans, and add DeleteVersionsRange (cosmos…
Browse files Browse the repository at this point in the history
  • Loading branch information
Klimov Sergey authored Nov 12, 2020
1 parent fbcc511 commit 9af2c83
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 26 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

### Bug Fixes

- [\#324](https://github.com/cosmos/iavl/pull/324) Fix `DeleteVersions` not properly removing
orphans, and add `DeleteVersionsRange` to delete a range (@klim0v)

## 0.14.2 (October 12, 2020)

### Bug Fixes
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
Expand Down Expand Up @@ -41,6 +42,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
Expand Down
38 changes: 33 additions & 5 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,23 +530,51 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
tree.ndb.opts.InitialVersion = version
}

// DeleteVersions deletes a series of versions from the MutableTree. An error
// is returned if any single version is invalid or the delete fails. All writes
// happen in a single batch with a single commit.
// DeleteVersions deletes a series of versions from the MutableTree.
// Deprecated: please use DeleteVersionsRange instead.
func (tree *MutableTree) DeleteVersions(versions ...int64) error {
debug("DELETING VERSIONS: %v\n", versions)

if len(versions) == 0 {
return nil
}

sort.Slice(versions, func(i, j int) bool {
return versions[i] < versions[j]
})

// Find ordered data and delete by interval
intervals := map[int64]int64{}
var fromVersion int64
for _, version := range versions {
if err := tree.deleteVersion(version); err != nil {
if version-fromVersion != intervals[fromVersion] {
fromVersion = version
}
intervals[fromVersion]++
}

for fromVersion, sortedBatchSize := range intervals {
if err := tree.DeleteVersionsRange(fromVersion, fromVersion+sortedBatchSize); err != nil {
return err
}
}

return nil
}

// DeleteVersionsRange removes versions from an interval from the MutableTree (not inclusive).
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error {
if err := tree.ndb.DeleteVersionsRange(fromVersion, toVersion); err != nil {
return err
}

if err := tree.ndb.Commit(); err != nil {
return err
}

for _, version := range versions {
for version := fromVersion; version < toVersion; version++ {
delete(tree.versions, version)
}

Expand Down
78 changes: 78 additions & 0 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"runtime"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -106,6 +107,83 @@ func TestMutableTree_DeleteVersions(t *testing.T) {
}
}

func TestMutableTree_DeleteVersionsRange(t *testing.T) {
require := require.New(t)

mdb := db.NewMemDB()
tree, err := NewMutableTree(mdb, 0)
require.NoError(err)

const maxLength = 100
const fromLength = 10

versions := make([]int64, 0, maxLength)
for count := 1; count <= maxLength; count++ {
versions = append(versions, int64(count))
countStr := strconv.Itoa(count)
// Set kv pair and save version
tree.Set([]byte("aaa"), []byte("bbb"))
tree.Set([]byte("key"+countStr), []byte("value"+countStr))
_, _, err = tree.SaveVersion()
require.NoError(err, "SaveVersion should not fail")
}

tree, err = NewMutableTree(mdb, 0)
require.NoError(err)
targetVersion, err := tree.LoadVersion(int64(maxLength))
require.NoError(err)
require.Equal(targetVersion, int64(maxLength), "targetVersion shouldn't larger than the actual tree latest version")

err = tree.DeleteVersionsRange(fromLength, int64(maxLength/2))
require.NoError(err, "DeleteVersionsTo should not fail")

for _, version := range versions[:fromLength-1] {
require.True(tree.versions[version], "versions %d no more than 10 should exist", version)

v, err := tree.LazyLoadVersion(version)
require.NoError(err, version)
require.Equal(v, version)

_, value := tree.Get([]byte("aaa"))
require.Equal(string(value), "bbb")

for _, count := range versions[:version] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
}

for _, version := range versions[fromLength : int64(maxLength/2)-1] {
require.False(tree.versions[version], "versions %d more 10 and no more than 50 should have been deleted", version)

_, err := tree.LazyLoadVersion(version)
require.Error(err)
}

for _, version := range versions[int64(maxLength/2)-1:] {
require.True(tree.versions[version], "versions %d more than 50 should exist", version)

v, err := tree.LazyLoadVersion(version)
require.NoError(err)
require.Equal(v, version)

_, value := tree.Get([]byte("aaa"))
require.Equal(string(value), "bbb")

for _, count := range versions[:fromLength] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
for _, count := range versions[int64(maxLength/2)-1 : version] {
countStr := strconv.Itoa(int(count))
_, value := tree.Get([]byte("key" + countStr))
require.Equal(string(value), "value"+countStr)
}
}
}

func TestMutableTree_InitialVersion(t *testing.T) {
memDB := db.NewMemDB()
tree, err := NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9})
Expand Down
49 changes: 49 additions & 0 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,55 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
return nil
}

// DeleteVersionsRange deletes versions from an interval (not inclusive).
func (ndb *nodeDB) DeleteVersionsRange(fromVersion, toVersion int64) error {
if fromVersion >= toVersion {
return errors.New("toVersion must be greater than fromVersion")
}
if toVersion == 0 {
return errors.New("toVersion must be greater than 0")
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

latest := ndb.getLatestVersion()
if latest < toVersion {
return errors.Errorf("cannot delete latest saved version (%d)", latest)
}

predecessor := ndb.getPreviousVersion(fromVersion)

for v, r := range ndb.versionReaders {
if v < toVersion && v > predecessor && r != 0 {
return errors.Errorf("unable to delete version %v with %v active readers", v, r)
}
}

// If the predecessor is earlier than the beginning of the lifetime, we can delete the orphan.
// Otherwise, we shorten its lifetime, by moving its endpoint to the predecessor version.
for version := fromVersion; version < toVersion; version++ {
ndb.traverseOrphansVersion(version, func(key, hash []byte) {
var from, to int64
orphanKeyFormat.Scan(key, &to, &from)
ndb.batch.Delete(key)
if from > predecessor {
ndb.batch.Delete(ndb.nodeKey(hash))
ndb.uncacheNode(hash)
} else {
ndb.saveOrphan(hash, from, predecessor)
}
})
}

// Delete the version root entries
ndb.traverseRange(rootKeyFormat.Key(fromVersion), rootKeyFormat.Key(toVersion), func(k, v []byte) {
ndb.batch.Delete(k)
})

return nil
}

// deleteNodesFrom deletes the given node and any descendants that have versions after the given
// (inclusive). It is mainly used via LoadVersionForOverwriting, to delete the current version.
func (ndb *nodeDB) deleteNodesFrom(version int64, hash []byte) error {
Expand Down
71 changes: 50 additions & 21 deletions tree_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func testRandomOperations(t *testing.T, randSeed int64) {
keySize = 16 // before base64-encoding
valueSize = 16 // before base64-encoding

versions = 32 // number of final versions to generate
reloadChance = 0.1 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.05 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.2 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)
versions = 32 // number of final versions to generate
reloadChance = 0.1 // chance of tree reload after save
deleteChance = 0.2 // chance of random version deletion after save
revertChance = 0.05 // chance to revert tree to random version with LoadVersionForOverwriting
syncChance = 0.2 // chance of enabling sync writes on tree load
cacheChance = 0.4 // chance of enabling caching
cacheSizeMax = 256 // maximum size of cache (will be random from 1)
deleteRangeChance = 0.5 // chance deletion versions in range
deleteRangeMaxBatch = 5 // small range to delete

versionOps = 64 // number of operations (create/update/delete) per version
updateRatio = 0.4 // ratio of updates out of all operations
Expand Down Expand Up @@ -148,12 +150,30 @@ func testRandomOperations(t *testing.T, randSeed int64) {
if r.Float64() < deleteChance {
versions := getMirrorVersions(diskMirrors, memMirrors)
if len(versions) > 2 {
deleteVersion := int64(versions[r.Intn(len(versions)-1)])
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
delete(diskMirrors, deleteVersion)
delete(memMirrors, deleteVersion)
if r.Float64() < deleteRangeChance {
indexFrom := r.Intn(len(versions) - 1)
from := versions[indexFrom]
batch := r.Intn(deleteRangeMaxBatch)
if batch > len(versions[indexFrom:])-2 {
batch = len(versions[indexFrom:]) - 2
}
to := versions[indexFrom+batch] + 1
t.Logf("Deleting versions %v-%v", from, to-1)
err = tree.DeleteVersionsRange(int64(from), int64(to))
require.NoError(t, err)
for version := from; version < to; version++ {
delete(diskMirrors, int64(version))
delete(memMirrors, int64(version))
}
} else {
i := r.Intn(len(versions) - 1)
deleteVersion := int64(versions[i])
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
delete(diskMirrors, deleteVersion)
delete(memMirrors, deleteVersion)
}
}
}

Expand Down Expand Up @@ -211,14 +231,23 @@ func testRandomOperations(t *testing.T, randSeed int64) {
// Once we're done, delete all prior versions in random order, make sure all orphans have been
// removed, and check that the latest versions matches the mirror.
remaining := tree.AvailableVersions()
remaining = remaining[:len(remaining)-1]
for len(remaining) > 0 {
i := r.Intn(len(remaining))
deleteVersion := int64(remaining[i])
remaining = append(remaining[:i], remaining[i+1:]...)
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)

if r.Float64() < deleteRangeChance {
if len(remaining) > 0 {
t.Logf("Deleting versions %v-%v", remaining[0], remaining[len(remaining)-2])
err = tree.DeleteVersionsRange(int64(remaining[0]), int64(remaining[len(remaining)-1]))
require.NoError(t, err)
}
} else {
remaining = remaining[:len(remaining)-1]
for len(remaining) > 0 {
i := r.Intn(len(remaining))
deleteVersion := int64(remaining[i])
remaining = append(remaining[:i], remaining[i+1:]...)
t.Logf("Deleting version %v", deleteVersion)
err = tree.DeleteVersion(deleteVersion)
require.NoError(t, err)
}
}
require.EqualValues(t, []int{int(version)}, tree.AvailableVersions())

Expand Down
Loading

0 comments on commit 9af2c83

Please sign in to comment.