Skip to content

Commit f5d275b

Browse files
committed
Minor bolt compact revisions
1 parent 52d0f5e commit f5d275b

File tree

4 files changed

+196
-113
lines changed

4 files changed

+196
-113
lines changed

bucket.go

+22
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,28 @@ func (b *Bucket) Delete(key []byte) error {
329329
return nil
330330
}
331331

332+
// Sequence returns the current integer for the bucket without incrementing it.
333+
func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
334+
335+
// SetSequence updates the sequence number for the bucket.
336+
func (b *Bucket) SetSequence(v uint64) error {
337+
if b.tx.db == nil {
338+
return ErrTxClosed
339+
} else if !b.Writable() {
340+
return ErrTxNotWritable
341+
}
342+
343+
// Materialize the root node if it hasn't been already so that the
344+
// bucket will be saved during commit.
345+
if b.rootNode == nil {
346+
_ = b.node(b.root, nil)
347+
}
348+
349+
// Increment and return the sequence.
350+
b.bucket.sequence = v
351+
return nil
352+
}
353+
332354
// NextSequence returns an autoincrementing integer for the bucket.
333355
func (b *Bucket) NextSequence() (uint64, error) {
334356
if b.tx.db == nil {

bucket_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) {
782782
}
783783
}
784784

785+
// Ensure bucket can set and update its sequence number.
786+
func TestBucket_Sequence(t *testing.T) {
787+
db := MustOpenDB()
788+
defer db.MustClose()
789+
790+
if err := db.Update(func(tx *bolt.Tx) error {
791+
bkt, err := tx.CreateBucket([]byte("0"))
792+
if err != nil {
793+
t.Fatal(err)
794+
}
795+
796+
// Retrieve sequence.
797+
if v := bkt.Sequence(); v != 0 {
798+
t.Fatalf("unexpected sequence: %d", v)
799+
}
800+
801+
// Update sequence.
802+
if err := bkt.SetSequence(1000); err != nil {
803+
t.Fatal(err)
804+
}
805+
806+
// Read sequence again.
807+
if v := bkt.Sequence(); v != 1000 {
808+
t.Fatalf("unexpected sequence: %d", v)
809+
}
810+
811+
return nil
812+
}); err != nil {
813+
t.Fatal(err)
814+
}
815+
816+
// Verify sequence in separate transaction.
817+
if err := db.View(func(tx *bolt.Tx) error {
818+
if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 {
819+
t.Fatalf("unexpected sequence: %d", v)
820+
}
821+
return nil
822+
}); err != nil {
823+
t.Fatal(err)
824+
}
825+
}
826+
785827
// Ensure that a bucket can return an autoincrementing sequence.
786828
func TestBucket_NextSequence(t *testing.T) {
787829
db := MustOpenDB()

cmd/bolt/main.go

+126-98
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,10 @@ type CompactCommand struct {
15391539
Stdin io.Reader
15401540
Stdout io.Writer
15411541
Stderr io.Writer
1542+
1543+
SrcPath string
1544+
DstPath string
1545+
TxMaxSize int64
15421546
}
15431547

15441548
// newCompactCommand returns a CompactCommand.
@@ -1550,163 +1554,187 @@ func newCompactCommand(m *Main) *CompactCommand {
15501554
}
15511555
}
15521556

1553-
// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values)
1554-
// discovered by Walk.
1555-
// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v.
1556-
type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error
1557-
1558-
// Walk walks recursively the bolt database db, calling walkFn for each key it finds.
1559-
func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error {
1560-
return db.View(func(tx *bolt.Tx) error {
1561-
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
1562-
return cmd.walkBucket(b, nil, name, nil, walkFn)
1563-
})
1564-
})
1565-
}
1566-
1567-
func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error {
1568-
if err := walkFn(keys, k, v); err != nil {
1569-
return err
1570-
}
1571-
// not a bucket, exit.
1572-
if v != nil {
1573-
return nil
1574-
}
1575-
1576-
keys2 := append(keys, k)
1577-
return b.ForEach(func(k, v []byte) error {
1578-
if v == nil {
1579-
return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn)
1580-
}
1581-
return cmd.walkBucket(b, keys2, k, v, walkFn)
1582-
})
1583-
}
1584-
15851557
// Run executes the command.
15861558
func (cmd *CompactCommand) Run(args ...string) (err error) {
15871559
// Parse flags.
15881560
fs := flag.NewFlagSet("", flag.ContinueOnError)
1589-
fs.SetOutput(cmd.Stderr)
1590-
var txMaxSize int64
1591-
fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.")
1592-
help := fs.Bool("h", false, "print this help")
1593-
if err := fs.Parse(args); err != nil {
1594-
return err
1595-
} else if *help {
1561+
fs.SetOutput(ioutil.Discard)
1562+
fs.StringVar(&cmd.DstPath, "o", "", "")
1563+
fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "")
1564+
if err := fs.Parse(args); err == flag.ErrHelp {
15961565
fmt.Fprintln(cmd.Stderr, cmd.Usage())
1597-
fs.PrintDefaults()
15981566
return ErrUsage
1567+
} else if err != nil {
1568+
return err
1569+
} else if cmd.DstPath == "" {
1570+
return fmt.Errorf("output file required")
15991571
}
16001572

1601-
// Require database path.
1602-
path := fs.Arg(0)
1603-
if path == "" {
1573+
// Require database paths.
1574+
cmd.SrcPath = fs.Arg(0)
1575+
if cmd.SrcPath == "" {
16041576
return ErrPathRequired
1605-
} else if _, err := os.Stat(path); os.IsNotExist(err) {
1606-
return ErrFileNotFound
16071577
}
1608-
fi, err := os.Stat(path)
1609-
if err != nil {
1578+
1579+
// Ensure source file exists.
1580+
fi, err := os.Stat(cmd.SrcPath)
1581+
if os.IsNotExist(err) {
1582+
return ErrFileNotFound
1583+
} else if err != nil {
16101584
return err
16111585
}
16121586
initialSize := fi.Size()
16131587

1614-
// Open database.
1615-
db, err := bolt.Open(path, 0444, nil)
1588+
// Open source database.
1589+
src, err := bolt.Open(cmd.SrcPath, 0444, nil)
16161590
if err != nil {
16171591
return err
16181592
}
1619-
defer db.Close()
1593+
defer src.Close()
16201594

1621-
var dstPath string
1622-
if fs.NArg() < 2 {
1623-
f, err := ioutil.TempFile("", "bolt-compact-")
1624-
if err != nil {
1625-
return fmt.Errorf("temp file: %v", err)
1626-
}
1627-
_ = f.Close()
1628-
_ = os.Remove(f.Name())
1629-
dstPath = f.Name()
1630-
fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath)
1631-
} else {
1632-
dstPath = fs.Arg(1)
1595+
// Open destination database.
1596+
dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil)
1597+
if err != nil {
1598+
return err
16331599
}
1600+
defer dst.Close()
16341601

1635-
defer func() {
1636-
fi, err := os.Stat(dstPath)
1637-
if err != nil {
1638-
fmt.Fprintln(cmd.Stderr, err)
1639-
}
1640-
newSize := fi.Size()
1641-
if newSize == 0 {
1642-
fmt.Fprintln(cmd.Stderr, "db size is 0")
1643-
}
1644-
fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize))
1645-
}()
1602+
// Run compaction.
1603+
if err := cmd.compact(dst, src); err != nil {
1604+
return err
1605+
}
16461606

1647-
dstdb, err := bolt.Open(dstPath, 0666, nil)
1607+
// Report stats on new size.
1608+
fi, err = os.Stat(cmd.DstPath)
16481609
if err != nil {
16491610
return err
1611+
} else if fi.Size() == 0 {
1612+
return fmt.Errorf("zero db size")
16501613
}
1651-
defer dstdb.Close()
1614+
fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size()))
16521615

1616+
return nil
1617+
}
1618+
1619+
func (cmd *CompactCommand) compact(dst, src *bolt.DB) error {
16531620
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
16541621
var size int64
1655-
tx, err := dstdb.Begin(true)
1622+
tx, err := dst.Begin(true)
16561623
if err != nil {
16571624
return err
16581625
}
1659-
defer func() {
1660-
if err != nil {
1661-
_ = tx.Rollback()
1662-
} else {
1663-
err = tx.Commit()
1664-
}
1665-
}()
1666-
return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error {
1667-
s := int64(len(k) + len(v))
1668-
if size+s > txMaxSize && txMaxSize != 0 {
1626+
defer tx.Rollback()
1627+
1628+
if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
1629+
// On each key/value, check if we have exceeded tx size.
1630+
sz := int64(len(k) + len(v))
1631+
if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 {
1632+
// Commit previous transaction.
16691633
if err := tx.Commit(); err != nil {
16701634
return err
16711635
}
1672-
tx, err = dstdb.Begin(true)
1636+
1637+
// Start new transaction.
1638+
tx, err = dst.Begin(true)
16731639
if err != nil {
16741640
return err
16751641
}
16761642
size = 0
16771643
}
1678-
size += s
1644+
size += sz
1645+
1646+
// Create bucket on the root transaction if this is the first level.
16791647
nk := len(keys)
16801648
if nk == 0 {
1681-
_, err := tx.CreateBucket(k)
1682-
return err
1649+
bkt, err := tx.CreateBucket(k)
1650+
if err != nil {
1651+
return err
1652+
}
1653+
if err := bkt.SetSequence(seq); err != nil {
1654+
return err
1655+
}
1656+
return nil
16831657
}
16841658

1659+
// Create buckets on subsequent levels, if necessary.
16851660
b := tx.Bucket(keys[0])
16861661
if nk > 1 {
16871662
for _, k := range keys[1:] {
16881663
b = b.Bucket(k)
16891664
}
16901665
}
1666+
1667+
// If there is no value then this is a bucket call.
16911668
if v == nil {
1692-
_, err := b.CreateBucket(k)
1693-
return err
1669+
bkt, err := b.CreateBucket(k)
1670+
if err != nil {
1671+
return err
1672+
}
1673+
if err := bkt.SetSequence(seq); err != nil {
1674+
return err
1675+
}
1676+
return nil
16941677
}
1678+
1679+
// Otherwise treat it as a key/value pair.
16951680
return b.Put(k, v)
1681+
}); err != nil {
1682+
return err
1683+
}
1684+
1685+
return tx.Commit()
1686+
}
1687+
1688+
// walkFunc is the type of the function called for keys (buckets and "normal"
1689+
// values) discovered by Walk. keys is the list of keys to descend to the bucket
1690+
// owning the discovered key/value pair k/v.
1691+
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
1692+
1693+
// walk walks recursively the bolt database db, calling walkFn for each key it finds.
1694+
func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error {
1695+
return db.View(func(tx *bolt.Tx) error {
1696+
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
1697+
return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
1698+
})
1699+
})
1700+
}
1701+
1702+
func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
1703+
// Execute callback.
1704+
if err := fn(keypath, k, v, seq); err != nil {
1705+
return err
1706+
}
1707+
1708+
// If this is not a bucket then stop.
1709+
if v != nil {
1710+
return nil
1711+
}
1712+
1713+
// Iterate over each child key/value.
1714+
keypath = append(keypath, k)
1715+
return b.ForEach(func(k, v []byte) error {
1716+
if v == nil {
1717+
bkt := b.Bucket(k)
1718+
return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
1719+
}
1720+
return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn)
16961721
})
16971722
}
16981723

16991724
// Usage returns the help message.
17001725
func (cmd *CompactCommand) Usage() string {
17011726
return strings.TrimLeft(`
1702-
usage: bolt compact PATH [DST_PATH]
1727+
usage: bolt compact [options] -o DST SRC
1728+
1729+
Compact opens a database at SRC path and walks it recursively, copying keys
1730+
as they are found from all buckets, to a newly created database at DST path.
17031731
1704-
Compact opens a database at PATH and walks it recursively entirely,
1705-
copying keys as they are found from all buckets, to a newly created db.
1732+
The original database is left untouched.
17061733
1707-
If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be
1708-
in a temporary location.
1734+
Additional options include:
17091735
1710-
The original db is left untouched.
1736+
-tx-max-size NUM
1737+
Specifies the maximum size of individual transactions.
1738+
Defaults to 64KB.
17111739
`, "\n")
17121740
}

0 commit comments

Comments
 (0)