Skip to content

Commit

Permalink
[db] implement Get/Put/Version() for BoltDBVersioned
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Apr 26, 2024
1 parent 763969a commit 4af3dc0
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 9 deletions.
186 changes: 177 additions & 9 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,23 @@
package db

import (
"bytes"
"context"
"fmt"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-core/db/batch"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/util/byteutil"
)

var (
ErrInvalidKeyLength = errors.New("invalid key length")
ErrDeleted = errors.New("deleted in DB")
_minKey = []byte{0} // the minimum key, used to store namespace's metadata
)

type (
Expand Down Expand Up @@ -52,12 +66,25 @@ type (

// BoltDBVersioned is KvVersioned implementation based on bolt DB
BoltDBVersioned struct {
db *BoltDB
db *BoltDB
hasher Hasher
}
)

// Option sets an option
type Option func(b *BoltDBVersioned)
type (
// Option sets an option
Option func(b *BoltDBVersioned)

// Hasher is a function to compute hash
Hasher func([]byte) []byte
)

// HasherOption sets the hasher
func HasherOption(h Hasher) Option {
return func(b *BoltDBVersioned) {
b.hasher = h
}
}

// NewBoltDBVersioned instantiates an BoltDB which implements KvVersioned
func NewBoltDBVersioned(cfg Config, opts ...Option) *BoltDBVersioned {
Expand All @@ -67,6 +94,12 @@ func NewBoltDBVersioned(cfg Config, opts ...Option) *BoltDBVersioned {
for _, opt := range opts {
opt(b)
}
if b.hasher == nil {
b.hasher = func(in []byte) []byte {
h := hash.Hash160b(in)
return h[:]
}
}
return b
}

Expand All @@ -85,17 +118,97 @@ func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) erro
if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Put
return nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
buf := batch.NewBatch()
if vn == nil {
// namespace not yet created
buf.Put(ns, _minKey, (&versionedNamespace{
name: ns,
keyLen: uint32(len(key)),
}).serialize(), "failed to create metadata")
} else {
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalidKeyLength, "expecting %d, got %d", vn.keyLen, len(key))
}
}
// check key's metadata
km, err := b.checkKey(ns, key)
if err != nil {
return err
}
km, exit := km.update(version, b.hasher(value))
if exit {
return nil
}
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), value, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}

// Get retrieves the most recent version
func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) {
if !b.db.IsReady() {
return nil, ErrDBNotStarted
}
// TODO: implement Get
return nil, nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return nil, err
}
if vn == nil {
return nil, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if len(key) != int(vn.keyLen) {
return nil, errors.Wrapf(ErrInvalidKeyLength, "expecting %d, got %d", vn.keyLen, len(key))
}
// check key's metadata
km, err := b.checkKey(ns, key)
if err != nil {
return nil, err
}
if km == nil || version < km.firstVersion {
return nil, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if version >= km.deleteVersion {
return nil, errors.Wrapf(ErrDeleted, "key = %x already deleted", key)
}
return b.get(ns, version, key)
}

func (b *BoltDBVersioned) get(ns string, version uint64, key []byte) ([]byte, error) {
// construct the actual key = key + version (represented in 8-bytes)
// and read from DB
key = versionedKey(key, version)
var value []byte
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
return errors.Wrapf(ErrBucketNotExist, "bucket = %x doesn't exist", []byte(ns))
}
c := bucket.Cursor()
k, v := c.Seek(key)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, append(key[:len(key)-8], 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
return errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key[:len(key)-8])
}
}
value = make([]byte, len(v))
copy(value, v)
return nil
})
if err == nil {
return value, nil
}
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
return nil, err
}
return nil, errors.Wrap(ErrIO, err.Error())
}

// Delete deletes a record,if key is nil,this will delete the whole bucket
Expand All @@ -112,8 +225,27 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
if !b.db.IsReady() {
return 0, ErrDBNotStarted
}
// TODO: implement Version
return 0, nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return 0, err
}
if vn == nil {
return 0, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if len(key) != int(vn.keyLen) {
return 0, errors.Wrapf(ErrInvalidKeyLength, "expecting %d, got %d", vn.keyLen, len(key))
}
// check key's metadata
km, err := b.checkKey(ns, key)
if err != nil {
return 0, err
}
if km == nil {
// key not yet written
return 0, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
return km.lastVersion, nil
}

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
Expand All @@ -124,6 +256,42 @@ func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic {
}
}

func versionedKey(key []byte, v uint64) []byte {
return append(key, byteutil.Uint64ToBytesBigEndian(v)...)
}

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
data, err := b.db.Get(ns, _minKey)
switch errors.Cause(err) {
case nil:
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err
}
}

func (b *BoltDBVersioned) checkKey(ns string, key []byte) (*keyMeta, error) {
data, err := b.db.Get(ns, append(key, 0))
switch errors.Cause(err) {
case nil:
km, err := deserializeKeyMeta(data)
if err != nil {
return nil, err
}
return km, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err
}
}

// KvWithVersion wraps the BoltDBVersioned with a certain version
type KvWithVersion struct {
db *BoltDBVersioned
Expand Down
Loading

0 comments on commit 4af3dc0

Please sign in to comment.