Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] KvWithVersion to handle both versioned and non-versioned namespace #4518

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"context"
"fmt"
"math"
"syscall"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/log"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand All @@ -43,6 +46,9 @@ type (

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// CommitToDB writes a batch to the underlying DB
CommitToDB(uint64, map[string]bool, batch.KVStoreBatch) error
}

// BoltDBVersioned is KvVersioned implementation based on bolt DB
Expand Down Expand Up @@ -196,6 +202,241 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
return last, err
}

// CommitToDB write a batch to DB, where the batch can contain keys for
// both versioned and non-versioned namespace
func (b *BoltDBVersioned) CommitToDB(version uint64, vns map[string]bool, kvsb batch.KVStoreBatch) error {
vnsize, ve, nve, err := dedup(vns, kvsb)
if err != nil {
return errors.Wrapf(err, "BoltDBVersioned failed to write batch")
}
return b.commitToDB(version, vnsize, ve, nve)
}

func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error {
var (
err error
nonDBErr bool
)
for c := uint8(0); c < b.db.config.NumRetries; c++ {
buckets := make(map[string]*bolt.Bucket)
if err = b.db.db.Update(func(tx *bolt.Tx) error {
// create/check metadata of all namespaces
for ns, size := range vnsize {
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
var vn *versionedNamespace
if val := bucket.Get(_minKey); val == nil {
// namespace not created yet
vn = &versionedNamespace{
keyLen: uint32(size),
}
ve = append(ve, batch.NewWriteInfo(
batch.Put, ns, _minKey, vn.serialize(),
fmt.Sprintf("failed to create metadata for namespace %s", ns),
))
} else {
if vn, err = deserializeVersionedNamespace(val); err != nil {
nonDBErr = true
return errors.Wrapf(err, "failed to get metadata of bucket %s", ns)
}
if vn.keyLen != uint32(size) {
nonDBErr = true
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size)
}
}
}
// keep order of the writes same as the original batch
for i := len(ve) - 1; i >= 0; i-- {
var (
write = ve[i]
ns = write.Namespace()
key = write.Key()
val = write.Value()
)
// get bucket
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
// check key's last version
var (
last uint64
notexist, isDelete bool
actualKey = keyForWrite(key, version)
)
c := bucket.Cursor()
k, _ := c.Seek(actualKey)
if k == nil || bytes.Compare(k, actualKey) == 1 {
k, _ = c.Prev()
if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
notexist = true
}
}
if !notexist {
isDelete, last = parseKey(k)
}
switch write.WriteType() {
case batch.Put:
if bytes.Equal(key, _minKey) {
// create namespace
if err = bucket.Put(key, val); err != nil {
return errors.Wrap(err, write.Error())
}
} else {
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if isDelete && version <= last {
// not allowed to perform write on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForWrite(key, version), val); err != nil {
return errors.Wrap(err, write.Error())
}
}
case batch.Delete:
if notexist {
continue
}
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if version < last {
// not allowed to perform delete on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForDelete(key, version), nil); err != nil {
return errors.Wrap(err, write.Error())
}
if err = bucket.Delete(keyForWrite(key, version)); err != nil {
return errors.Wrap(err, write.Error())
}
}
}
// write non-versioned keys
for i := len(nve) - 1; i >= 0; i-- {
var (
write = nve[i]
ns = write.Namespace()
)
switch write.WriteType() {
case batch.Put:
// get bucket
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
if err = bucket.Put(write.Key(), write.Value()); err != nil {
return errors.Wrap(err, write.Error())
}
case batch.Delete:
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
continue
}
if err = bucket.Delete(write.Key()); err != nil {
return errors.Wrap(err, write.Error())
}
}
}
return nil
}); err == nil || nonDBErr {
break
}
}
if nonDBErr {
return err
}
if err != nil {
if errors.Is(err, syscall.ENOSPC) {
log.L().Fatal("BoltDBVersioned failed to write batch", zap.Error(err))
}
return errors.Wrap(ErrIO, err.Error())
}
return nil
}

// dedup does 3 things:
// 1. deduplicate entries in the batch, only keep the last write for each key
// 2. splits entries into 2 slices according to the input namespace map
// 3. return a map of input namespace's keyLength
func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*batch.WriteInfo, []*batch.WriteInfo, error) {
kvsb.Lock()
defer kvsb.Unlock()

type doubleKey struct {
ns string
key string
}

var (
entryKeySet = make(map[doubleKey]bool)
nsKeyLen = make(map[string]int)
nsInMap = make([]*batch.WriteInfo, 0)
other = make([]*batch.WriteInfo, 0)
pickAll = len(vns) == 0
)
for i := kvsb.Size() - 1; i >= 0; i-- {
write, e := kvsb.Entry(i)
if e != nil {
return nil, nil, nil, e
}
// only handle Put and Delete
var (
writeType = write.WriteType()
ns = write.Namespace()
key = write.Key()
)
if writeType != batch.Put && writeType != batch.Delete {
continue
}
k := doubleKey{ns: ns, key: string(key)}
if entryKeySet[k] {
continue
}
if writeType == batch.Put {
// for a later DELETE, we want to capture the earlier PUT
// otherwise, the DELETE might return not-exist
entryKeySet[k] = true
}
if pickAll || vns[k.ns] {
nsInMap = append(nsInMap, write)
} else {
other = append(other, write)
}
// check key size
if pickAll || vns[k.ns] {
if n, ok := nsKeyLen[k.ns]; !ok {
nsKeyLen[k.ns] = len(write.Key())
} else {
if n != len(write.Key()) {
return nil, nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key()))
}
}
}
}
return nsKeyLen, nsInMap, other, nil
}

func isNotExist(err error) bool {
return err == ErrNotExist || err == ErrBucketNotExist
}
Expand Down
Loading
Loading