Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] implement Get/Put/Delete/Version() for BoltDBVersioned #4256

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 173 additions & 79 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,42 @@
package db

import (
"bytes"
"context"
"fmt"
"math"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

var (
ErrDeleted = errors.New("deleted in DB")
_minKey = []byte{0} // the minimum key, used to store namespace's metadata
)

type (
// KvVersioned is a versioned key-value store, where each key has multiple
// versions of value (corresponding to different heights in a blockchain)
//
// Versioning is achieved by using (key + 8-byte version) as the actual
// storage key in the underlying DB. For each bucket, a metadata is stored
// at the special key = []byte{0}. The metadata specifies the bucket's name
// and the key length.
//
// For each versioned key, the special location = key + []byte{0} stores the
// key's metadata, which includes the following info:
// 1. the version when the key is first created
// 2. the version when the key is lastly written
// 3. the version when the key is deleted
// 4. hash of the key's last written value (to detect/avoid same write)
// If the location does not store a value, the key has never been written.
//
// How to use a versioned DB:
//
// db := NewBoltDBVersioned(cfg) // creates a versioned DB
// db.Start(ctx)
// defer func() { db.Stop(ctx) }()
//
// kv := db.SetVersion(5)
// value, err := kv.Get("ns", key) // read 'key' at version 5
// kv = db.SetVersion(8)
// err := kv.Put("ns", key, value) // write 'key' at version 8

KvVersioned interface {
VersionedDB interface {
lifecycle.StartStopper

// Put insert or update a record identified by (namespace, key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to modify the interface this way? I would recommend that the version should be for all keys rather than individual keys. Do we have a use case that requires this? I think the original interface design is easier for integration, or will there be a wrapper to encapsulate it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, KvVersioned is the new wrapper to encapsulate it

Put(uint64, string, []byte, []byte) error

// Get gets a record by (namespace, key)
Get(uint64, string, []byte) ([]byte, error)

// Delete deletes a record by (namespace, key)
Delete(uint64, string, []byte) error

// Base returns the underlying KVStore
Base() KVStore

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
SetVersion(uint64) KVStoreBasic
}

// BoltDBVersioned is KvVersioned implementation based on bolt DB
Expand All @@ -56,18 +51,12 @@ type (
}
)

// Option sets an option
type Option func(b *BoltDBVersioned)

// NewBoltDBVersioned instantiates an BoltDB which implements KvVersioned
func NewBoltDBVersioned(cfg Config, opts ...Option) *BoltDBVersioned {
b := &BoltDBVersioned{
// NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB
func NewBoltDBVersioned(cfg Config) *BoltDBVersioned {
b := BoltDBVersioned{
db: NewBoltDB(cfg),
}
for _, opt := range opts {
opt(b)
}
return b
return &b
}

// Start starts the DB
Expand All @@ -80,77 +69,182 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error {
return b.db.Stop(ctx)
}

// Base returns the underlying KVStore
func (b *BoltDBVersioned) Base() KVStore {
return b.db
}

// Put writes a <key, value> record
func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error {
func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error {
Copy link
Member

@envestcc envestcc Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to implement these single functionPut/Delete? I prefer only keep WriteBatch, b/c version update will be atomic, and also can simplify the impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, it does not hurt to keep it

if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Put
return nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
buf := batch.NewBatch()
if vn == nil {
// namespace not yet created
buf.Put(ns, _minKey, (&versionedNamespace{
keyLen: uint32(len(key)),
}).serialize(), "failed to create metadata")
} else {
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
last, _, err := b.get(math.MaxUint64, ns, key)
if !isNotExist(err) && version < last {
// not allowed to perform write on an earlier version
return ErrInvalid
}
buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete?

}
buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}

// Get retrieves the most recent version
func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) {
func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, error) {
if !b.db.IsReady() {
return nil, ErrDBNotStarted
}
// TODO: implement Get
return nil, nil
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return nil, err
}
_, v, err := b.get(version, ns, key)
return v, err
}

// Delete deletes a record,if key is nil,this will delete the whole bucket
func (b *BoltDBVersioned) Delete(ns string, key []byte) error {
func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) (uint64, []byte, error) {
var (
last uint64
isDelete bool
value []byte
)
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
return ErrBucketNotExist
}
var (
c = bucket.Cursor()
min = keyForDelete(key, 0)
key = keyForWrite(key, version)
k, v = c.Seek(key)
)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, min) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
return ErrNotExist
}
}
isDelete, last = parseKey(k)
value = make([]byte, len(v))
copy(value, v)
return nil
})
if err != nil {
return last, nil, err
}
if isDelete {
return last, nil, ErrDeleted
}
return last, value, nil
}

// Delete deletes a record, if key does not exist, it returns nil
func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Delete
return nil
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return err
}
last, _, err := b.get(math.MaxUint64, ns, key)
if isNotExist(err) {
return err
}
if version < last {
// not allowed to perform delete on an earlier version
return ErrInvalid
}
buf := batch.NewBatch()
buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key))
buf.Delete(ns, keyForWrite(key, version), fmt.Sprintf("failed to delete key %x", key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete write key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline

  1. it is to keep only the last operation on the key
  2. i recall in the original POC, we have cases where the same key is deleted, then written in the batch at around height 400k. So it is a valid request (to delete then write on the same height). We'll allow this behavior (and also write then delete on the same height)

return b.db.WriteBatch(buf)
}

// Version returns the key's most recent version
func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
if !b.db.IsReady() {
return 0, ErrDBNotStarted
}
// TODO: implement Version
return 0, nil
}

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic {
return &KvWithVersion{
db: b,
version: v,
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return 0, err
}
last, _, err := b.get(math.MaxUint64, ns, key)
if isNotExist(err) {
// key not yet written
err = errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
return last, err
}

// KvWithVersion wraps the BoltDBVersioned with a certain version
type KvWithVersion struct {
db *BoltDBVersioned
version uint64 // version for Get/Put()
func isNotExist(err error) bool {
return err == ErrNotExist || err == ErrBucketNotExist
}

// Start starts the DB
func (b *KvWithVersion) Start(context.Context) error {
panic("should call BoltDBVersioned's Start method")
func keyForWrite(key []byte, v uint64) []byte {
k := make([]byte, len(key), len(key)+9)
copy(k, key)
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...)
return append(k, 1)
}

// Stop stops the DB
func (b *KvWithVersion) Stop(context.Context) error {
panic("should call BoltDBVersioned's Stop method")
func keyForDelete(key []byte, v uint64) []byte {
k := make([]byte, len(key), len(key)+9)
copy(k, key)
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...)
return append(k, 0)
}

// Put writes a <key, value> record
func (b *KvWithVersion) Put(ns string, key, value []byte) error {
return b.db.Put(ns, b.version, key, value)
func parseKey(key []byte) (bool, uint64) {
size := len(key)
return (key[size-1] == 0), byteutil.BytesToUint64BigEndian(key[size-9 : size-1])
}

// Get retrieves a key's value
func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) {
return b.db.Get(ns, b.version, key)
func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
data, err := b.db.Get(ns, _minKey)
switch errors.Cause(err) {
case nil:
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err
}
}

// Delete deletes a key
func (b *KvWithVersion) Delete(ns string, key []byte) error {
return b.db.Delete(ns, key)
func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error {
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
if vn == nil {
return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns)
}
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
return nil
}
Loading
Loading