From 68806d7c0b9abd25dd7b526e400e06be697bb0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20HEITZMANN?= <2le@2le.net> Date: Fri, 13 Dec 2024 17:22:50 +0100 Subject: [PATCH] RecordIO options for reader (#39) --- recordio/file_reader.go | 80 ++++++++++++++++++++++++++------ recordio/io_factory.go | 10 ++++ recordio/recordio_test.go | 2 +- simpledb/compaction.go | 2 +- simpledb/db.go | 2 +- simpledb/recovery.go | 5 +- simpledb/rw_memstore_test.go | 5 +- simpledb/sstable_manager.go | 4 +- simpledb/sstable_manager_test.go | 19 ++++---- 9 files changed, 97 insertions(+), 32 deletions(-) create mode 100644 recordio/io_factory.go diff --git a/recordio/file_reader.go b/recordio/file_reader.go index 872f0ca..2e7a95a 100644 --- a/recordio/file_reader.go +++ b/recordio/file_reader.go @@ -350,28 +350,61 @@ func readNextV2(r *FileReader) ([]byte, error) { return returnSlice, nil } -// TODO(thomas): we have to add an option pattern here as well +// options +type FileReaderOptions struct { + path string + file *os.File + bufferSizeBytes int + factory IOFactory +} -// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path. -func NewFileReaderWithPath(path string) (ReaderI, error) { - return newFileReaderWithFactory(path, BufferedIOFactory{}) +type FileReaderOption func(*FileReaderOptions) + +// ReaderPath defines the file path where to read the recordio file +// Either this or File must be supplied. +func ReaderPath(p string) FileReaderOption { + return func(args *FileReaderOptions) { + args.path = p + } } -// NewFileReaderWithFile creates a new recordio file reader that can read RecordIO files with the given file. -// The file will be managed from here on out (ie closing). -func NewFileReaderWithFile(file *os.File) (ReaderI, error) { - // we're closing the existing file, as it's being recreated by the factory below - err := file.Close() - if err != nil { - return nil, fmt.Errorf("error while closing existing file handle at '%s': %w", file.Name(), err) +// ReaderFile uses the given os.File as the sink to write into. The code manages the given file lifecycle (ie closing). +// Either this or Path must be supplied +func ReaderFile(p *os.File) FileReaderOption { + return func(args *FileReaderOptions) { + args.file = p } +} - return newFileReaderWithFactory(file.Name(), BufferedIOFactory{}) +// BufferSizeBytes sets the IoFactory, by default it uses BufferedIOFactory. +func ReaderIoFactory(factory IOFactory) FileReaderOption { + return func(args *FileReaderOptions) { + args.factory = factory + } } -// NewFileReaderWithFactory creates a new recordio file reader under a path and a given ReaderWriterCloserFactory. -func newFileReaderWithFactory(path string, factory ReaderWriterCloserFactory) (ReaderI, error) { - f, r, err := factory.CreateNewReader(path, DefaultBufferSize) +// set Factory , by default it uses DefaultBufferSize. +// This is the internal memory buffer before it's written to disk. +func ReaderBufferSizeBytes(p int) FileReaderOption { + return func(args *FileReaderOptions) { + args.bufferSizeBytes = p + } +} + +// NewFileReader creates a new reader with the given options, either Path or File must be supplied, compression is optional. +func NewFileReader(readerOptions ...FileReaderOption) (ReaderI, error) { + opts := &FileReaderOptions{ + path: "", + file: nil, + bufferSizeBytes: DefaultBufferSize, + factory: BufferedIOFactory{}, + } + + for _, readOption := range readerOptions { + readOption(opts) + } + + f, r, err := opts.factory.CreateNewReader(opts.path, opts.bufferSizeBytes) if err != nil { return nil, err } @@ -384,3 +417,20 @@ func newFileReaderWithFactory(path string, factory ReaderWriterCloserFactory) (R currentOffset: 0, }, nil } + +// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path. +func NewFileReaderWithPath(path string) (ReaderI, error) { + return NewFileReader(ReaderPath(path)) +} + +// NewFileReaderWithFile creates a new recordio file reader that can read RecordIO files with the given file. +// The file will be managed from here on out (ie closing). +func NewFileReaderWithFile(file *os.File) (ReaderI, error) { + // we're closing the existing file, as it's being recreated by the factory below + err := file.Close() + if err != nil { + return nil, fmt.Errorf("error while closing existing file handle at '%s': %w", file.Name(), err) + } + + return NewFileReader(ReaderPath(file.Name())) +} diff --git a/recordio/io_factory.go b/recordio/io_factory.go new file mode 100644 index 0000000..b58b577 --- /dev/null +++ b/recordio/io_factory.go @@ -0,0 +1,10 @@ +package recordio + +import ( + "os" +) + +type IOFactory interface { + CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) + CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) +} diff --git a/recordio/recordio_test.go b/recordio/recordio_test.go index 8cf3422..6ff2df9 100644 --- a/recordio/recordio_test.go +++ b/recordio/recordio_test.go @@ -67,7 +67,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) { require.NoError(t, err) reader := func() ReaderI { - reader, err := newFileReaderWithFactory(tmpFile.Name(), DirectIOFactory{}) + reader, err := NewFileReader(ReaderPath(tmpFile.Name()), ReaderIoFactory(DirectIOFactory{})) require.NoError(t, err) require.NoError(t, reader.Open()) return reader diff --git a/simpledb/compaction.go b/simpledb/compaction.go index 437b2cf..2b345f0 100644 --- a/simpledb/compaction.go +++ b/simpledb/compaction.go @@ -29,7 +29,7 @@ func backgroundCompaction(db *DB) { select { case <-db.compactionTickerStopChannel: return nil - case _ = <-db.compactionTicker.C: + case <-db.compactionTicker.C: metadata, err := executeCompaction(db) if err != nil { return err diff --git a/simpledb/db.go b/simpledb/db.go index 87c5d15..89e291f 100644 --- a/simpledb/db.go +++ b/simpledb/db.go @@ -427,7 +427,7 @@ func CompactionFileThreshold(n int) ExtraOption { } // CompactionMaxSizeBytes tells whether an SSTable is considered for compaction. -// SSTables over the given threshold will not be compacted any further. Default is 5GB in DefaultCompactionMaxSizeBytes. +// SSTables over the given threshold will not be compacted any further. Default is 5GB in DefaultCompactionMaxSizeBytes func CompactionMaxSizeBytes(n uint64) ExtraOption { return func(args *ExtraOptions) { args.compactionMaxSizeBytes = n diff --git a/simpledb/recovery.go b/simpledb/recovery.go index 03c70f7..3360f2c 100644 --- a/simpledb/recovery.go +++ b/simpledb/recovery.go @@ -240,7 +240,10 @@ func (db *DB) replayAndSetupWriteAheadLog() error { err = executeFlush(db, memStoreFlushAction{ memStore: swapMemstore(db), }) - + if err != nil { + log.Printf("Error Flush memstore %v", err) + return err + } elapsedDuration := time.Since(start) log.Printf("done replaying WAL in %v with %d records\n", elapsedDuration, numRecords) } diff --git a/simpledb/rw_memstore_test.go b/simpledb/rw_memstore_test.go index 1e5f68e..8448601 100644 --- a/simpledb/rw_memstore_test.go +++ b/simpledb/rw_memstore_test.go @@ -1,10 +1,11 @@ package simpledb import ( - "github.com/stretchr/testify/assert" - "github.com/thomasjungblut/go-sstables/memstore" "strconv" "testing" + + "github.com/stretchr/testify/assert" + "github.com/thomasjungblut/go-sstables/memstore" ) func TestRWMemstoreAddShouldGoToWriteStore(t *testing.T) { diff --git a/simpledb/sstable_manager.go b/simpledb/sstable_manager.go index 0baf8e5..6bc980c 100644 --- a/simpledb/sstable_manager.go +++ b/simpledb/sstable_manager.go @@ -123,10 +123,10 @@ func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uin for i := 0; i < len(s.allSSTableReaders); i++ { reader := s.allSSTableReaders[i] // avoid the EmptySStableReader (or empty files) and only include small enough SSTables - if reader.MetaData().NumRecords > 0 && reader.MetaData().TotalBytes < compactionMaxSizeBytes { + if reader.MetaData().TotalBytes < compactionMaxSizeBytes { paths = append(paths, reader.BasePath()) numRecords += reader.MetaData().NumRecords - if i == 0 { + if numRecords == 0 { canRemoveTombstone = true } } diff --git a/simpledb/sstable_manager_test.go b/simpledb/sstable_manager_test.go index 076e4c6..fb2431d 100644 --- a/simpledb/sstable_manager_test.go +++ b/simpledb/sstable_manager_test.go @@ -1,16 +1,17 @@ package simpledb import ( - "github.com/stretchr/testify/assert" - sdbProto "github.com/thomasjungblut/go-sstables/simpledb/proto" - "github.com/thomasjungblut/go-sstables/skiplist" - "github.com/thomasjungblut/go-sstables/sstables" - "github.com/thomasjungblut/go-sstables/sstables/proto" "os" "path/filepath" "reflect" "sync" "testing" + + "github.com/stretchr/testify/assert" + sdbProto "github.com/thomasjungblut/go-sstables/simpledb/proto" + "github.com/thomasjungblut/go-sstables/skiplist" + "github.com/thomasjungblut/go-sstables/sstables" + "github.com/thomasjungblut/go-sstables/sstables/proto" ) func TestSSTableManagerAdditionHappyPath(t *testing.T) { @@ -67,10 +68,10 @@ func TestSSTableManagerSelectCompactionCandidates(t *testing.T) { path: "4", }) - assertCompactionAction(t, 0, []string(nil), manager.candidateTablesForCompaction(25)) - assertCompactionAction(t, 5, []string{"2"}, manager.candidateTablesForCompaction(51)) - assertCompactionAction(t, 15, []string{"1", "2"}, manager.candidateTablesForCompaction(101)) - assertCompactionAction(t, 115, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(1500)) + assertCompactionAction(t, 0, []string{"4"}, manager.candidateTablesForCompaction(25)) + assertCompactionAction(t, 5, []string{"2", "4"}, manager.candidateTablesForCompaction(51)) + assertCompactionAction(t, 15, []string{"1", "2", "4"}, manager.candidateTablesForCompaction(101)) + assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(1500)) } func TestSSTableCompactionReflectionHappyPath(t *testing.T) {