Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed May 27, 2024
1 parent 628bb2d commit 9f88d88
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 36 deletions.
44 changes: 30 additions & 14 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro
}
km, exit := km.updateWrite(version, value)
if exit {
return nil
// not a valid write request
return ErrInvalid
}
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), value, fmt.Sprintf("failed to put key %x", key))
Expand All @@ -145,21 +146,29 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er
if err != nil {
return nil, err
}
hitLast, err := km.updateRead(version)
hitLast, err := km.checkRead(version)
if err != nil {
return nil, errors.Wrapf(err, "key = %x", key)
}
if hitLast {
return km.lastWrite, nil
}
return b.get(version, ns, key)
v, err := b.get(version, ns, key)
if len(v) == 0 {
// the key is deleted on this version
return nil, ErrDeleted
}
return v, err
}

func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) ([]byte, error) {
// construct the actual key = key + version (represented in 8-bytes)
// and read from DB
var (
meta = append(key, 0)
value []byte
)
key = versionedKey(key, version)
var value []byte
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
Expand All @@ -169,8 +178,8 @@ func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) ([]byte, er
k, v := c.Seek(key)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, append(key[:len(key)-8], 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
if k == nil || bytes.Compare(k, meta) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than key's meta
panic(fmt.Sprintf("BoltDBVersioned.get(), invalid key = %x", key))
}
}
Expand All @@ -195,15 +204,20 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
// check key's metadata
km, err := b.checkNamespaceAndKey(ns, key)
if err != nil {
if cause := errors.Cause(err); cause != ErrNotExist && cause != ErrInvalid {
return err
}
return err
}
if km == nil || version < km.lastVersion || version <= km.deleteVersion {
return nil
if km == nil {
return errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if version < km.lastVersion || version <= km.deleteVersion {
// not allowed to delete an earlier version
return ErrInvalid

Check warning on line 214 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L214

Added line #L214 was not covered by tests
}
km.deleteVersion = version
return b.db.Put(ns, append(key, 0), km.serialize())
buf := batch.NewBatch()
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), nil, fmt.Sprintf("failed to mark delete version %d", version))
return b.db.WriteBatch(buf)
}

// Version returns the key's most recent version
Expand Down Expand Up @@ -235,7 +249,9 @@ func (b *BoltDBVersioned) SetVersion(v uint64) KVStore {
}

func versionedKey(key []byte, v uint64) []byte {
return append(key, byteutil.Uint64ToBytesBigEndian(v)...)
k := make([]byte, len(key), len(key)+8)
copy(k, key)
return append(k, byteutil.Uint64ToBytesBigEndian(v)...)
}

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
Expand Down Expand Up @@ -276,7 +292,7 @@ func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) (*keyMeta,
return nil, err
}
if vn == nil {
return nil, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
return nil, errors.Wrapf(ErrBucketNotExist, "namespace = %x doesn't exist", ns)
}
if len(key) != int(vn.keyLen) {
return nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
Expand Down
48 changes: 28 additions & 20 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestVersionedDB(t *testing.T) {
r.NoError(db.Put(4, _bucket1, _k4, _v1))
r.NoError(db.Put(7, _bucket1, _k4, _v3))
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
{_bucket2, _k1, nil, 0, ErrBucketNotExist}, // bucket not exist
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, _v2, 0, nil},
{_bucket1, _k2, _v1, 1, nil},
Expand All @@ -99,14 +99,14 @@ func TestVersionedDB(t *testing.T) {
// overwrite the same height again
r.NoError(db.Put(6, _bucket1, _k2, _v4))
r.NoError(db.Put(7, _bucket1, _k4, _v4))
// write to earlier version again does nothing
r.NoError(db.Put(3, _bucket1, _k2, _v4))
r.NoError(db.Put(4, _bucket1, _k4, _v4))
// write to earlier version again is invalid
r.Equal(ErrInvalid, db.Put(3, _bucket1, _k2, _v4))
r.Equal(ErrInvalid, db.Put(4, _bucket1, _k4, _v4))
// write with same value
r.NoError(db.Put(9, _bucket1, _k2, _v4))
r.NoError(db.Put(10, _bucket1, _k4, _v4))
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
{_bucket2, _k1, nil, 0, ErrBucketNotExist}, // bucket not exist
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, _v2, 0, nil},
{_bucket1, _k2, _v1, 1, nil},
Expand Down Expand Up @@ -148,13 +148,17 @@ func TestVersionedDB(t *testing.T) {
r.Equal(e.height, value)
}
// test delete
r.NoError(db.Delete(10, _bucket2, _k1))
for _, k := range [][]byte{_k1, _k2, _k3, _k4, _k5, _k10} {
r.Equal(ErrBucketNotExist, errors.Cause(db.Delete(10, _bucket2, _k1)))
for _, k := range [][]byte{_k2, _k4} {
r.NoError(db.Delete(10, _bucket1, k))
}
for _, k := range [][]byte{_k1, _k3, _k5} {
r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k)))
}
r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10)))
// key still can be read before delete version
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
{_bucket2, _k1, nil, 0, ErrBucketNotExist}, // bucket not exist
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, _v2, 0, nil},
{_bucket1, _k2, _v1, 1, nil},
Expand All @@ -181,9 +185,9 @@ func TestVersionedDB(t *testing.T) {
r.Equal(e.err, errors.Cause(err))
r.Equal(e.v, value)
}
// write before delete version does nothing
r.NoError(db.Put(9, _bucket1, _k2, _k2))
r.NoError(db.Put(9, _bucket1, _k4, _k4))
// write before delete version is invalid
r.Equal(ErrInvalid, db.Put(9, _bucket1, _k2, _k2))
r.Equal(ErrInvalid, db.Put(9, _bucket1, _k4, _k4))
for _, e := range []versionTest{
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version
Expand All @@ -195,10 +199,10 @@ func TestVersionedDB(t *testing.T) {
r.Equal(e.v, value)
}
// write after delete version
r.NoError(db.Put(10, _bucket1, _k2, _k2))
r.NoError(db.Put(10, _bucket1, _k4, _k4))
r.NoError(db.Put(12, _bucket1, _k2, _k2))
r.NoError(db.Put(12, _bucket1, _k4, _k4))
for _, e := range []versionTest{
{_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist
{_bucket2, _k1, nil, 0, ErrBucketNotExist}, // bucket not exist
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, _v2, 0, nil},
{_bucket1, _k2, _v1, 1, nil},
Expand All @@ -207,17 +211,21 @@ func TestVersionedDB(t *testing.T) {
{_bucket1, _k2, _v3, 5, nil},
{_bucket1, _k2, _v4, 6, nil},
{_bucket1, _k2, _v4, 8, nil},
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, _k2, 10, nil}, // after delete version
{_bucket1, _k2, _v4, 9, nil}, // before delete version
{_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k2, _k2, 12, nil}, // after next write version
{_bucket1, _k3, nil, 0, ErrNotExist},
{_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version
{_bucket1, _k4, _v2, 2, nil},
{_bucket1, _k4, _v2, 3, nil},
{_bucket1, _k4, _v1, 4, nil},
{_bucket1, _k4, _v1, 6, nil},
{_bucket1, _k4, _v4, 7, nil},
{_bucket1, _k4, _v4, 9, nil}, // before delete version
{_bucket1, _k4, _k4, 10, nil}, // after delete version
{_bucket1, _k4, _v4, 9, nil}, // before delete version
{_bucket1, _k4, nil, 10, ErrDeleted}, // after delete version
{_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version
{_bucket1, _k4, _k4, 12, nil}, // after next write version
{_bucket1, _k5, nil, 0, ErrNotExist},
{_bucket1, _k10, nil, 0, ErrInvalid},
} {
Expand All @@ -228,9 +236,9 @@ func TestVersionedDB(t *testing.T) {
// check version after delete
for _, e := range []versionTest{
{_bucket1, _k1, nil, 0, ErrNotExist},
{_bucket1, _k2, nil, 10, nil},
{_bucket1, _k2, nil, 12, nil},
{_bucket1, _k3, nil, 0, ErrNotExist},
{_bucket1, _k4, nil, 10, nil},
{_bucket1, _k4, nil, 12, nil},
{_bucket1, _k5, nil, 0, ErrNotExist},
{_bucket1, _k10, nil, 0, ErrInvalid},
} {
Expand Down
2 changes: 1 addition & 1 deletion db/db_versioned_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func deserializeKeyMeta(buf []byte) (*keyMeta, error) {
return fromProtoKM(&km), nil
}

func (km *keyMeta) updateRead(version uint64) (bool, error) {
func (km *keyMeta) checkRead(version uint64) (bool, error) {
if km == nil || version < km.firstVersion {
return false, ErrNotExist
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_versioned_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestKmUpdate(t *testing.T) {
km.firstVersion = e.first
km.lastVersion = e.last
km.deleteVersion = e.delete
hitLast, err := km.updateRead(e.version)
hitLast, err := km.checkRead(e.version)
r.Equal(e.hitOrExit, hitLast)
r.Equal(e.err, errors.Cause(err))
}
Expand Down

0 comments on commit 9f88d88

Please sign in to comment.