Skip to content

Commit

Permalink
handle delete-after-write
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jul 15, 2024
1 parent 302c0d4 commit cc03631
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
24 changes: 3 additions & 21 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,9 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er
if err != nil {
return nil, err
}
hitLast, err = km.hitLastWrite(last, version)
if err != nil {
if _, err = km.hitLastWrite(last, version); err != nil {
return nil, err
}
if !hitLast && len(v) == 0 {
// this is a delete-after-write
return nil, ErrDeleted
}
return v, nil
}

Expand Down Expand Up @@ -221,13 +216,6 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
if err = km.updateDelete(version); err != nil {
return err
}
if version == km.lastVersion {
// write <key, nil> to indicate this is a delete-after-write
buf := batch.NewBatch()
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), nil, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}
return b.db.Put(ns, append(key, 0), km.serialize())
}

Expand All @@ -245,15 +233,9 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
// key not yet written
return 0, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if lastDelete := km.lastDelete(); lastDelete > km.lastVersion {
if km.lastDelete() >= km.lastVersion {
// there's a delete after last write
err = errors.Wrapf(ErrDeleted, "key = %x already deleted", key)
} else if lastDelete == km.lastVersion {
var v []byte
_, v, err = b.get(km.lastVersion, ns, key)
if err == nil && len(v) == 0 {
// this is a delete-after-write
err = errors.Wrapf(ErrDeleted, "key = %x already deleted", key)
}
}
return km.lastVersion, err
}
Expand Down
10 changes: 10 additions & 0 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ func TestMultipleWriteDelete(t *testing.T) {
r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write
_, err = db.Version(_bucket1, _k2)
r.Equal(ErrDeleted, errors.Cause(err))
r.NoError(db.Put(18, _bucket1, _k2, _v3)) // write again
value, err := db.Get(18, _bucket1, _k2)
r.NoError(err)
r.Equal(_v3, value)
v, err = db.Version(_bucket1, _k2)
r.NoError(err)
r.EqualValues(18, v)
r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write
_, err = db.Version(_bucket1, _k2)
r.Equal(ErrDeleted, errors.Cause(err))
r.NoError(db.Put(21, _bucket1, _k2, _v4))
v, err = db.Version(_bucket1, _k2)
r.NoError(err)
Expand Down
36 changes: 22 additions & 14 deletions db/db_versioned_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,29 @@ func (km *keyMeta) updateWrite(version uint64, value []byte) (*keyMeta, bool) {
lastVersion: version,
}, false
}
if version < km.lastVersion || version < km.lastDelete() {
lastDel := km.lastDelete()
if version < km.lastVersion || version < lastDel {
// writing to an earlier version complicates things, for now it is not allowed
return km, true
}
if version == lastDel {
// clear the last delete
km.clearLastDelete()
}
km.lastWrite = value
km.lastVersion = version
return km, false
}

func (km *keyMeta) updateDelete(version uint64) error {
if version < km.lastVersion || version <= km.lastDelete() {
lastDel := km.lastDelete()
if version < km.lastVersion || version < lastDel {
// not allowed to delete an earlier version
return ErrInvalid
}
km.deleteVersion = append(km.deleteVersion, version)
if version > lastDel {
km.deleteVersion = append(km.deleteVersion, version)
}
return nil
}

Expand All @@ -130,29 +138,29 @@ func (km *keyMeta) lastDelete() uint64 {
return 0
}

func (km *keyMeta) clearLastDelete() {
if size := len(km.deleteVersion); size > 0 {
km.deleteVersion = km.deleteVersion[:size-1]
}
}

func (km *keyMeta) hitLastWrite(write, read uint64) (bool, error) {
if write > read {
panic(fmt.Sprintf("last write %d > attempted read %d", write, read))
}
var (
nextDelete uint64
hasDelete bool
hasDelete bool
)
for _, v := range km.deleteVersion {
if v >= write {
nextDelete = v
hasDelete = (write <= nextDelete && nextDelete <= read)
hasDelete = (v <= read)
break
}
}
if !hasDelete {
return true, nil
}
if write < nextDelete {
if hasDelete {
// there's a delete after last write
return false, ErrDeleted

}
// delete and write fall on the same version, need to check further
// if it's write-after-delete or delete-after-write
return false, nil
return true, nil
}
17 changes: 12 additions & 5 deletions db/db_versioned_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func TestKmUpdate(t *testing.T) {
{1, 1, 0, 2, true, nil},
// write at version 1, delete at version 1
{1, 1, 1, 0, false, ErrNotExist},
{1, 1, 1, 1, false, nil},
{1, 1, 1, 2, false, nil},
{1, 1, 1, 1, false, ErrDeleted},
{1, 1, 1, 2, false, ErrDeleted},
// write at version 1, delete at version 3
{1, 1, 3, 0, false, ErrNotExist},
{1, 1, 3, 1, true, nil},
Expand All @@ -73,8 +73,8 @@ func TestKmUpdate(t *testing.T) {
{1, 3, 3, 0, false, ErrNotExist},
{1, 3, 3, 1, false, nil},
{1, 3, 3, 2, false, nil},
{1, 3, 3, 3, false, nil},
{1, 3, 3, 4, false, nil},
{1, 3, 3, 3, false, ErrDeleted},
{1, 3, 3, 4, false, ErrDeleted},
// write at version 3, delete at version 5
{1, 3, 5, 0, false, ErrNotExist},
{1, 3, 5, 1, false, nil},
Expand Down Expand Up @@ -143,9 +143,16 @@ func TestKmUpdate(t *testing.T) {
}
km0, exit := km.updateWrite(e.version, _v1)
r.Equal(e.hitOrExit, exit)
if !exit {
if exit {
r.Nil(km0.lastWrite)
r.Equal(e.last, km0.lastVersion)
} else {
r.Equal(_v1, km0.lastWrite)
r.Equal(e.version, km0.lastVersion)
if e.version == e.last {
// delete-after-write is cleared
r.Zero(len(km0.deleteVersion))
}
}
}
})
Expand Down

0 comments on commit cc03631

Please sign in to comment.