Skip to content

Commit

Permalink
change AddVersionedNamespace() to DB init option
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jan 15, 2025
1 parent 5a31c11 commit 492e836
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 38 deletions.
63 changes: 41 additions & 22 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type (
// Filter returns <k, v> pair in a bucket that meet the condition
Filter(uint64, string, Condition, []byte, []byte) ([][]byte, [][]byte, error)

// AddVersionedNamespace adds a versioned namespace
AddVersionedNamespace(string, uint32) error

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)
}
Expand All @@ -56,47 +53,69 @@ type (
db *BoltDB
vns map[string]int // map of versioned namespace
}

// Namespace specifies the name and key length of the versioned namespace
Namespace struct {
ns string
keyLen uint32
}
)

// BoltDBVersionedOption sets option for BoltDBVersioned
type BoltDBVersionedOption func(*BoltDBVersioned)

func VnsOption(ns ...Namespace) BoltDBVersionedOption {
return func(k *BoltDBVersioned) {
for _, v := range ns {
k.vns[v.ns] = int(v.keyLen)
}
}
}

// NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB
func NewBoltDBVersioned(cfg Config) *BoltDBVersioned {
func NewBoltDBVersioned(cfg Config, opts ...BoltDBVersionedOption) *BoltDBVersioned {
b := BoltDBVersioned{
db: NewBoltDB(cfg),
vns: make(map[string]int),
}
for _, opt := range opts {
opt(&b)
}
return &b
}

// Start starts the DB
func (b *BoltDBVersioned) Start(ctx context.Context) error {
return b.db.Start(ctx)
if err := b.db.Start(ctx); err != nil {
return err
}
return b.addVersionedNamespace()
}

// Stop stops the DB
func (b *BoltDBVersioned) Stop(ctx context.Context) error {
return b.db.Stop(ctx)
}

// AddVersionedNamespace adds a versioned namespace
func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error {
vn, err := b.checkNamespace(ns)
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
// create metadata for namespace
if err = b.db.Put(ns, _minKey, (&versionedNamespace{
keyLen: keyLen,
}).serialize()); err != nil {
func (b *BoltDBVersioned) addVersionedNamespace() error {
for ns, keyLen := range b.vns {
vn, err := b.checkNamespace(ns)
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
// create metadata for namespace
if err = b.db.Put(ns, _minKey, (&versionedNamespace{
keyLen: uint32(keyLen),
}).serialize()); err != nil {
return err
}
continue
}
if err != nil {
return err
}
b.vns[ns] = int(keyLen)
return nil
}
if err != nil {
return err
}
if vn.keyLen != keyLen {
return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen)
if vn.keyLen != uint32(keyLen) {
return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen)
}
}
b.vns[ns] = int(keyLen)
return nil
}

Expand Down
23 changes: 7 additions & 16 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,17 @@ func TestVersionedDB(t *testing.T) {

cfg := DefaultConfig
cfg.DbPath = testPath
db := NewBoltDBVersioned(cfg)
db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))}))
ctx := context.Background()
r.NoError(db.Start(ctx))
defer func() {
db.Stop(ctx)
}()

// namespace and key does not exist
// namespace created
vn, err := db.checkNamespace(_bucket1)
r.Nil(vn)
r.ErrorContains(err, ErrNotExist.Error())
// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2))))
r.ErrorContains(db.AddVersionedNamespace(_bucket1, 8), "namespace test_ns1 already exists with key length = 5, got 8")
r.NoError(err)
r.Equal(uint32(len(_k2)), vn.keyLen)
// write first key
r.NoError(db.Put(0, _bucket1, _k2, _v2))
vn, err = db.checkNamespace(_bucket1)
Expand Down Expand Up @@ -290,12 +287,10 @@ func TestMultipleWriteDelete(t *testing.T) {

cfg := DefaultConfig
cfg.DbPath = testPath
db := NewBoltDBVersioned(cfg)
db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))}))
ctx := context.Background()
r.NoError(db.Start(ctx))

// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2))))
if i == 0 {
// multiple writes and deletes
r.NoError(db.Put(1, _bucket1, _k2, _v1))
Expand Down Expand Up @@ -457,7 +452,8 @@ func TestCommitToDB(t *testing.T) {

cfg := DefaultConfig
cfg.DbPath = testPath
db := NewBoltDBVersioned(cfg)
db := NewBoltDBVersioned(cfg, VnsOption(
Namespace{_bucket1, uint32(len(_k1))}, Namespace{_bucket2, uint32(len(_v1))}))
ctx := context.Background()
r.NoError(db.Start(ctx))
defer func() {
Expand All @@ -474,11 +470,6 @@ func TestCommitToDB(t *testing.T) {
} {
b.Put(e.ns, e.k, e.v, "test")
}
r.PanicsWithValue("BoltDBVersioned.commitToDB(), vns = test_ns2 does not exist", func() { db.CommitToDB(1, b) })

// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1))))
r.NoError(db.AddVersionedNamespace(_bucket2, uint32(len(_v1))))
r.NoError(db.CommitToDB(1, b))
b.Clear()
for _, e := range []versionTest{
Expand Down

0 comments on commit 492e836

Please sign in to comment.