Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jun 5, 2024
1 parent 628bb2d commit a689cfa
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 70 deletions.
72 changes: 53 additions & 19 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro
}
km, exit := km.updateWrite(version, value)
if exit {
return nil
// not a valid write request
return ErrInvalid
}
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), value, fmt.Sprintf("failed to put key %x", key))
Expand All @@ -145,21 +146,37 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er
if err != nil {
return nil, err
}
hitLast, err := km.updateRead(version)
hitLast, err := km.checkRead(version)
if err != nil {
return nil, errors.Wrapf(err, "key = %x", key)
}
if hitLast {
return km.lastWrite, nil
}
return b.get(version, ns, key)
last, v, err := b.get(version, ns, key)
if err != nil {
return nil, err
}
hitLast, err = km.deleteInBetween(last, version)
if err != nil {
return nil, err
}
if !hitLast && len(v) == 0 {
// this is a delete-after-write
return nil, ErrDeleted
}
return v, nil
}

func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) ([]byte, error) {
func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) (uint64, []byte, error) {
// construct the actual key = key + version (represented in 8-bytes)
// and read from DB
var (
last uint64
meta = append(key, 0)
value []byte
)
key = versionedKey(key, version)
var value []byte
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
Expand All @@ -169,22 +186,23 @@ func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) ([]byte, er
k, v := c.Seek(key)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, append(key[:len(key)-8], 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
if k == nil || bytes.Compare(k, meta) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than key's meta
panic(fmt.Sprintf("BoltDBVersioned.get(), invalid key = %x", key))
}
}
last = byteutil.BytesToUint64BigEndian(k[len(k)-8:])
value = make([]byte, len(v))
copy(value, v)
return nil
})
if err == nil {
return value, nil
return last, value, nil
}
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
return nil, err
return 0, nil, err
}
return nil, errors.Wrap(ErrIO, err.Error())
return 0, nil, errors.Wrap(ErrIO, err.Error())
}

// Delete deletes a record, if key does not exist, it returns nil
Expand All @@ -195,14 +213,21 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
// check key's metadata
km, err := b.checkNamespaceAndKey(ns, key)
if err != nil {
if cause := errors.Cause(err); cause != ErrNotExist && cause != ErrInvalid {
return err
}
return err
}
if km == nil || version < km.lastVersion || version <= km.deleteVersion {
return nil
if km == nil {
return errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if err = km.updateDelete(version); err != nil {
return err
}
if version == km.lastVersion {
// write <key, nil> to indicate this is a delete-after-write
buf := batch.NewBatch()
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), nil, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}
km.deleteVersion = version
return b.db.Put(ns, append(key, 0), km.serialize())
}

Expand All @@ -220,8 +245,15 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
// key not yet written
return 0, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if km.deleteVersion != 0 {
if lastDelete := km.lastDelete(); lastDelete > km.lastVersion {
err = errors.Wrapf(ErrDeleted, "key = %x already deleted", key)
} else if lastDelete == km.lastVersion {
var v []byte
_, v, err = b.get(km.lastVersion, ns, key)
if err == nil && len(v) == 0 {
// this is a delete-after-write
err = errors.Wrapf(ErrDeleted, "key = %x already deleted", key)
}
}
return km.lastVersion, err
}
Expand All @@ -235,7 +267,9 @@ func (b *BoltDBVersioned) SetVersion(v uint64) KVStore {
}

func versionedKey(key []byte, v uint64) []byte {
return append(key, byteutil.Uint64ToBytesBigEndian(v)...)
k := make([]byte, len(key), len(key)+8)
copy(k, key)
return append(k, byteutil.Uint64ToBytesBigEndian(v)...)
}

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
Expand Down Expand Up @@ -276,7 +310,7 @@ func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) (*keyMeta,
return nil, err
}
if vn == nil {
return nil, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
return nil, errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns)
}
if len(key) != int(vn.keyLen) {
return nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
Expand Down
130 changes: 105 additions & 25 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func TestVersionedDB(t *testing.T) {
// overwrite the same height again
r.NoError(db.Put(6, _bucket1, _k2, _v4))
r.NoError(db.Put(7, _bucket1, _k4, _v4))
// write to earlier version again does nothing
r.NoError(db.Put(3, _bucket1, _k2, _v4))
r.NoError(db.Put(4, _bucket1, _k4, _v4))
// write to earlier version again is invalid
r.Equal(ErrInvalid, db.Put(3, _bucket1, _k2, _v4))
r.Equal(ErrInvalid, db.Put(4, _bucket1, _k4, _v4))
// write with same value
r.NoError(db.Put(9, _bucket1, _k2, _v4))
r.NoError(db.Put(10, _bucket1, _k4, _v4))
Expand Down Expand Up @@ -148,10 +148,14 @@ func TestVersionedDB(t *testing.T) {
r.Equal(e.height, value)
}
// test delete
r.NoError(db.Delete(10, _bucket2, _k1))
for _, k := range [][]byte{_k1, _k2, _k3, _k4, _k5, _k10} {
r.NoError(db.Delete(10, _bucket1, k))
r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket2, _k1)))
for _, k := range [][]byte{_k2, _k4} {
r.NoError(db.Delete(11, _bucket1, k))
}
for _, k := range [][]byte{_k1, _k3, _k5} {
r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k)))
}
r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10)))
// key still can be read before delete version
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
Expand All @@ -163,40 +167,41 @@ func TestVersionedDB(t *testing.T) {
{_bucket1, _k2, _v3, 5, nil},
{_bucket1, _k2, _v4, 6, nil},
{_bucket1, _k2, _v4, 8, nil},
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k2, _v4, 9, nil},
{_bucket1, _k2, _v4, 10, nil}, // before delete version
{_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k3, nil, 0, ErrNotExist},
{_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version
{_bucket1, _k4, _v2, 2, nil},
{_bucket1, _k4, _v2, 3, nil},
{_bucket1, _k4, _v1, 4, nil},
{_bucket1, _k4, _v1, 6, nil},
{_bucket1, _k4, _v4, 7, nil},
{_bucket1, _k4, _v4, 9, nil}, // before delete version
{_bucket1, _k4, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k4, _v4, 10, nil}, // before delete version
{_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k5, nil, 0, ErrNotExist},
{_bucket1, _k10, nil, 0, ErrInvalid},
} {
value, err := db.Get(e.height, e.ns, e.k)
r.Equal(e.err, errors.Cause(err))
r.Equal(e.v, value)
}
// write before delete version does nothing
r.NoError(db.Put(9, _bucket1, _k2, _k2))
r.NoError(db.Put(9, _bucket1, _k4, _k4))
// write before delete version is invalid
r.Equal(ErrInvalid, db.Put(9, _bucket1, _k2, _k2))
r.Equal(ErrInvalid, db.Put(9, _bucket1, _k4, _k4))
for _, e := range []versionTest{
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k4, _v4, 9, nil}, // before delete version
{_bucket1, _k4, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k2, _v4, 10, nil}, // before delete version
{_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k4, _v4, 10, nil}, // before delete version
{_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version
} {
value, err := db.Get(e.height, e.ns, e.k)
r.Equal(e.err, errors.Cause(err))
r.Equal(e.v, value)
}
// write after delete version
r.NoError(db.Put(10, _bucket1, _k2, _k2))
r.NoError(db.Put(10, _bucket1, _k4, _k4))
r.NoError(db.Put(12, _bucket1, _k2, _k2))
r.NoError(db.Put(12, _bucket1, _k4, _k4))
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
{_bucket1, _k1, nil, 0, ErrNotExist},
Expand All @@ -207,17 +212,19 @@ func TestVersionedDB(t *testing.T) {
{_bucket1, _k2, _v3, 5, nil},
{_bucket1, _k2, _v4, 6, nil},
{_bucket1, _k2, _v4, 8, nil},
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, _k2, 10, nil}, // after delete version
{_bucket1, _k2, _v4, 10, nil}, // before delete version
{_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k2, _k2, 12, nil}, // after next write version
{_bucket1, _k3, nil, 0, ErrNotExist},
{_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version
{_bucket1, _k4, _v2, 2, nil},
{_bucket1, _k4, _v2, 3, nil},
{_bucket1, _k4, _v1, 4, nil},
{_bucket1, _k4, _v1, 6, nil},
{_bucket1, _k4, _v4, 7, nil},
{_bucket1, _k4, _v4, 9, nil}, // before delete version
{_bucket1, _k4, _k4, 10, nil}, // after delete version
{_bucket1, _k4, _v4, 10, nil}, // before delete version
{_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k4, _k4, 12, nil}, // after next write version
{_bucket1, _k5, nil, 0, ErrNotExist},
{_bucket1, _k10, nil, 0, ErrInvalid},
} {
Expand All @@ -228,9 +235,9 @@ func TestVersionedDB(t *testing.T) {
// check version after delete
for _, e := range []versionTest{
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, nil, 10, nil},
{_bucket1, _k2, nil, 12, nil},
{_bucket1, _k3, nil, 0, ErrNotExist},
{_bucket1, _k4, nil, 10, nil},
{_bucket1, _k4, nil, 12, nil},
{_bucket1, _k5, nil, 0, ErrNotExist},
{_bucket1, _k10, nil, 0, ErrInvalid},
} {
Expand All @@ -239,3 +246,76 @@ func TestVersionedDB(t *testing.T) {
r.Equal(e.height, value)
}
}

func TestMultipleWriteDelete(t *testing.T) {
r := require.New(t)
testPath, err := testutil.PathOfTempFile("test-version")
r.NoError(err)
defer func() {
testutil.CleanupPath(testPath)
}()

cfg := DefaultConfig
cfg.DbPath = testPath
db := NewBoltDBVersioned(cfg)
ctx := context.Background()
r.NoError(db.Start(ctx))
defer func() {
db.Stop(ctx)
}()

// multiple writes and deletes
r.NoError(db.Put(1, _bucket1, _k2, _v1))
r.NoError(db.Put(3, _bucket1, _k2, _v3))
v, err := db.Version(_bucket1, _k2)
r.NoError(err)
r.EqualValues(3, v)
r.NoError(db.Delete(7, _bucket1, _k2))
_, err = db.Version(_bucket1, _k2)
r.Equal(ErrDeleted, errors.Cause(err))
r.NoError(db.Put(10, _bucket1, _k2, _v2))
v, err = db.Version(_bucket1, _k2)
r.NoError(err)
r.EqualValues(10, v)
r.NoError(db.Delete(15, _bucket1, _k2))
_, err = db.Version(_bucket1, _k2)
r.Equal(ErrDeleted, errors.Cause(err))
r.NoError(db.Put(18, _bucket1, _k2, _v3))
r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write
_, err = db.Version(_bucket1, _k2)
r.Equal(ErrDeleted, errors.Cause(err))
r.NoError(db.Put(21, _bucket1, _k2, _v4))
v, err = db.Version(_bucket1, _k2)
r.NoError(err)
r.EqualValues(21, v)
r.NoError(db.Delete(25, _bucket1, _k2))
r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete
v, err = db.Version(_bucket1, _k2)
r.NoError(err)
r.EqualValues(25, v)
for _, e := range []versionTest{
{_bucket1, _k2, nil, 0, ErrNotExist},
{_bucket1, _k2, _v1, 1, nil},
{_bucket1, _k2, _v1, 2, nil},
{_bucket1, _k2, _v3, 3, nil},
{_bucket1, _k2, _v3, 6, nil},
{_bucket1, _k2, nil, 7, ErrDeleted},
{_bucket1, _k2, nil, 9, ErrDeleted},
{_bucket1, _k2, _v2, 10, nil},
{_bucket1, _k2, _v2, 14, nil},
{_bucket1, _k2, nil, 15, ErrDeleted},
{_bucket1, _k2, nil, 17, ErrDeleted},
{_bucket1, _k2, nil, 18, ErrDeleted},
{_bucket1, _k2, nil, 20, ErrDeleted},
{_bucket1, _k2, _v4, 21, nil},
{_bucket1, _k2, _v4, 22, nil},
{_bucket1, _k2, _v4, 24, nil},
{_bucket1, _k2, _k2, 25, nil},
{_bucket1, _k2, _k2, 26, nil},
{_bucket1, _k2, _k2, 25000, nil},
} {
value, err := db.Get(e.height, e.ns, e.k)
r.Equal(e.err, errors.Cause(err))
r.Equal(e.v, value)
}
}
Loading

0 comments on commit a689cfa

Please sign in to comment.