Skip to content

Commit 1f1730b

Browse files
committed
merge old files
1 parent 729e63e commit 1f1730b

File tree

2 files changed

+147
-49
lines changed

2 files changed

+147
-49
lines changed

bitcask.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func New(dir string) (*Bitcask, error) {
3737
mu: &sync.RWMutex{},
3838
}
3939
bitcask.loadIndex()
40+
bitcask.merge()
4041
return bitcask, nil
4142
}
4243

db_file.go

+146-49
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"log"
89
"os"
910
"path/filepath"
1011
"strconv"
@@ -30,21 +31,6 @@ func newBitFile(dir string) (*BitFile, error) {
3031
return bf, nil
3132
}
3233

33-
func toBitFile(fid uint32, fp *os.File) (*BitFile, error) {
34-
stat, err := fp.Stat()
35-
if err != nil {
36-
return nil, err
37-
}
38-
39-
bf := &BitFile{
40-
fp: fp,
41-
fid: fid,
42-
offset: uint64(stat.Size()),
43-
}
44-
45-
return bf, nil
46-
}
47-
4834
func (bf *BitFile) populateFilesMap(dir string) (uint32, error) {
4935
files, err := scanOldFiles(dir)
5036
if err != nil {
@@ -69,31 +55,6 @@ func (bf *BitFile) populateFilesMap(dir string) (uint32, error) {
6955
return maxFid, nil
7056
}
7157

72-
func getFid(name string) (uint32, error) {
73-
fsz := len(name)
74-
fid, err := strconv.ParseUint(name[:fsz-5], 10, 32)
75-
if err != nil {
76-
return 0, errors.New("Unable to parse file id.")
77-
}
78-
79-
return uint32(fid), nil
80-
}
81-
82-
func scanOldFiles(dir string) ([]os.DirEntry, error) {
83-
files, err := os.ReadDir(dir)
84-
if err != nil {
85-
return nil, errors.New("Unable to open dir.")
86-
}
87-
var entry []os.DirEntry
88-
for _, file := range files {
89-
if !strings.HasSuffix(file.Name(), ext) {
90-
continue
91-
}
92-
entry = append(entry, file)
93-
}
94-
return entry, err
95-
}
96-
9758
func (bf *BitFile) write(key, value []byte) (*entry, error) {
9859
ts := uint32(time.Now().Unix())
9960

@@ -119,14 +80,6 @@ func (bf *BitFile) read(offset uint64, size uint32) ([]byte, error) {
11980
return read(bf.fp, int64(offset), size)
12081
}
12182

122-
func read(fp *os.File, offset int64, size uint32) ([]byte, error) {
123-
buf := make([]byte, size)
124-
if _, err := fp.ReadAt(buf, offset); err != nil {
125-
return nil, err
126-
}
127-
return buf, nil
128-
}
129-
13083
func (bf *BitFile) del(key []byte) error {
13184
ts := uint32(time.Now().Unix())
13285
keySize := uint32(len(key))
@@ -181,6 +134,54 @@ func (bf *BitFile) newFid() string {
181134
return fmt.Sprintf("%06d", bf.fid)
182135
}
183136

137+
func read(fp *os.File, offset int64, size uint32) ([]byte, error) {
138+
buf := make([]byte, size)
139+
if _, err := fp.ReadAt(buf, offset); err != nil {
140+
return nil, err
141+
}
142+
return buf, nil
143+
}
144+
145+
func toBitFile(fid uint32, fp *os.File) (*BitFile, error) {
146+
stat, err := fp.Stat()
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
bf := &BitFile{
152+
fp: fp,
153+
fid: fid,
154+
offset: uint64(stat.Size()),
155+
}
156+
157+
return bf, nil
158+
}
159+
160+
func getFid(name string) (uint32, error) {
161+
fsz := len(name)
162+
fid, err := strconv.ParseUint(name[:fsz-5], 10, 32)
163+
if err != nil {
164+
return 0, errors.New("Unable to parse file id.")
165+
}
166+
167+
return uint32(fid), nil
168+
}
169+
170+
func scanOldFiles(dir string) ([]os.DirEntry, error) {
171+
files, err := os.ReadDir(dir)
172+
if err != nil {
173+
return nil, errors.New("Unable to open dir.")
174+
}
175+
var entry []os.DirEntry
176+
for _, file := range files {
177+
if !strings.HasSuffix(file.Name(), ext) {
178+
continue
179+
}
180+
entry = append(entry, file)
181+
}
182+
return entry, err
183+
}
184+
184185
const ext = ".data"
185186

186187
func newFilePath(dir, fid string) string {
@@ -206,7 +207,10 @@ func (bf *BitFiles) add(fid uint32, fp *BitFile) {
206207
bf.files[fid] = fp
207208
}
208209

209-
const lockFileName = "bitcask.lock"
210+
const (
211+
lockFileName = "bitcask.lock"
212+
mergeFileExt = ".merge"
213+
)
210214

211215
func lock(dir string) (*os.File, error) {
212216
return os.OpenFile(filepath.Join(dir, lockFileName), os.O_EXCL|os.O_CREATE|os.O_RDWR, os.ModePerm)
@@ -228,3 +232,96 @@ func newEntryFromBuf(fp *os.File, fid uint32, offset int64) (*entry, uint32, uin
228232
entry := newEntry(fid, valueSize, uint64(offset)+uint64(HeaderSize+keySize), uint64(ts))
229233
return entry, keySize, entrySize
230234
}
235+
236+
func newMergeFileName(dir string, fid uint32) string {
237+
return fmt.Sprintf("%s%s%d%s", dir, string(os.PathSeparator), fid, mergeFileExt)
238+
}
239+
240+
func newMergeFile(mf string) (*os.File, error) {
241+
fp, err := os.OpenFile(mf, os.O_CREATE|os.O_RDWR, 0644)
242+
if err != nil {
243+
return nil, err
244+
}
245+
246+
return fp, nil
247+
}
248+
249+
func (b *Bitcask) merge() {
250+
log.Println("Start merge old files.")
251+
files, err := scanOldFiles(b.option.Dir)
252+
if err != nil {
253+
return
254+
}
255+
for _, file := range files {
256+
log.Println("File name:", file.Name())
257+
if filepath.Base(b.actFile.fp.Name()) == file.Name() { //skip active file
258+
continue
259+
}
260+
261+
oldFilePath := filepath.Join(b.option.Dir, file.Name())
262+
fp, err := os.Open(oldFilePath)
263+
if err != nil {
264+
continue
265+
}
266+
267+
info, err := fp.Stat()
268+
if err != nil {
269+
log.Println("Err check file size:", err)
270+
continue
271+
}
272+
if info.Size() == 0 {
273+
log.Printf("Removing old empy file:%s\n", file.Name())
274+
os.Remove(oldFilePath)
275+
continue
276+
}
277+
278+
fid, _ := getFid(file.Name())
279+
mergeFileName := newMergeFileName(b.option.Dir, fid)
280+
mergeFp, err := newMergeFile(mergeFileName)
281+
282+
var (
283+
offset int64 = 0
284+
mergeOffset int64 = 0
285+
)
286+
for {
287+
entry, keySize, entrySize := newEntryFromBuf(fp, fid, offset)
288+
if entry == nil {
289+
break
290+
}
291+
292+
readOffset := offset + HeaderSize
293+
offset += int64(entrySize)
294+
keyByte, err := read(fp, readOffset, keySize)
295+
if err != nil {
296+
continue
297+
}
298+
299+
//check if the key was deleted
300+
e, err := b.index.get(keyByte)
301+
if e == nil || entry.valueSize == 0 {
302+
b.index.del(string(keyByte))
303+
continue
304+
}
305+
valByte, err := read(fp, readOffset+int64(keySize), entry.valueSize)
306+
if err != nil {
307+
continue
308+
}
309+
310+
buf, _ := encode(keyByte, valByte, keySize, entry.valueSize, uint32(entry.timestamp), entrySize)
311+
_, err = mergeFp.WriteAt(buf, mergeOffset)
312+
mergeOffset += int64(entrySize)
313+
if err != nil {
314+
continue
315+
}
316+
317+
b.index.put(string(keyByte), entry)
318+
}
319+
320+
b.mu.Lock()
321+
fp.Close()
322+
mergeFp.Close()
323+
log.Printf("Replace old file:'%s' to new merge file: '%s'", oldFilePath, mergeFileName)
324+
os.Rename(mergeFileName, oldFilePath)
325+
b.mu.Unlock()
326+
}
327+
}

0 commit comments

Comments
 (0)