Skip to content

Commit

Permalink
RecordIO options for reader (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebheitzmann authored Dec 13, 2024
1 parent c006f0b commit 68806d7
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 32 deletions.
80 changes: 65 additions & 15 deletions recordio/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,28 +350,61 @@ func readNextV2(r *FileReader) ([]byte, error) {
return returnSlice, nil
}

// TODO(thomas): we have to add an option pattern here as well
// options
type FileReaderOptions struct {
path string
file *os.File
bufferSizeBytes int
factory IOFactory
}

// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.
func NewFileReaderWithPath(path string) (ReaderI, error) {
return newFileReaderWithFactory(path, BufferedIOFactory{})
type FileReaderOption func(*FileReaderOptions)

// ReaderPath defines the file path where to read the recordio file
// Either this or File must be supplied.
func ReaderPath(p string) FileReaderOption {
return func(args *FileReaderOptions) {
args.path = p
}
}

// NewFileReaderWithFile creates a new recordio file reader that can read RecordIO files with the given file.
// The file will be managed from here on out (ie closing).
func NewFileReaderWithFile(file *os.File) (ReaderI, error) {
// we're closing the existing file, as it's being recreated by the factory below
err := file.Close()
if err != nil {
return nil, fmt.Errorf("error while closing existing file handle at '%s': %w", file.Name(), err)
// ReaderFile uses the given os.File as the sink to write into. The code manages the given file lifecycle (ie closing).
// Either this or Path must be supplied
func ReaderFile(p *os.File) FileReaderOption {
return func(args *FileReaderOptions) {
args.file = p
}
}

return newFileReaderWithFactory(file.Name(), BufferedIOFactory{})
// BufferSizeBytes sets the IoFactory, by default it uses BufferedIOFactory.
func ReaderIoFactory(factory IOFactory) FileReaderOption {
return func(args *FileReaderOptions) {
args.factory = factory
}
}

// NewFileReaderWithFactory creates a new recordio file reader under a path and a given ReaderWriterCloserFactory.
func newFileReaderWithFactory(path string, factory ReaderWriterCloserFactory) (ReaderI, error) {
f, r, err := factory.CreateNewReader(path, DefaultBufferSize)
// set Factory , by default it uses DefaultBufferSize.
// This is the internal memory buffer before it's written to disk.
func ReaderBufferSizeBytes(p int) FileReaderOption {
return func(args *FileReaderOptions) {
args.bufferSizeBytes = p
}
}

// NewFileReader creates a new reader with the given options, either Path or File must be supplied, compression is optional.
func NewFileReader(readerOptions ...FileReaderOption) (ReaderI, error) {
opts := &FileReaderOptions{
path: "",
file: nil,
bufferSizeBytes: DefaultBufferSize,
factory: BufferedIOFactory{},
}

for _, readOption := range readerOptions {
readOption(opts)
}

f, r, err := opts.factory.CreateNewReader(opts.path, opts.bufferSizeBytes)
if err != nil {
return nil, err
}
Expand All @@ -384,3 +417,20 @@ func newFileReaderWithFactory(path string, factory ReaderWriterCloserFactory) (R
currentOffset: 0,
}, nil
}

// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.
func NewFileReaderWithPath(path string) (ReaderI, error) {
return NewFileReader(ReaderPath(path))
}

// NewFileReaderWithFile creates a new recordio file reader that can read RecordIO files with the given file.
// The file will be managed from here on out (ie closing).
func NewFileReaderWithFile(file *os.File) (ReaderI, error) {
// we're closing the existing file, as it's being recreated by the factory below
err := file.Close()
if err != nil {
return nil, fmt.Errorf("error while closing existing file handle at '%s': %w", file.Name(), err)
}

return NewFileReader(ReaderPath(file.Name()))
}
10 changes: 10 additions & 0 deletions recordio/io_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package recordio

import (
"os"
)

type IOFactory interface {
CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)
CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)
}
2 changes: 1 addition & 1 deletion recordio/recordio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) {
require.NoError(t, err)

reader := func() ReaderI {
reader, err := newFileReaderWithFactory(tmpFile.Name(), DirectIOFactory{})
reader, err := NewFileReader(ReaderPath(tmpFile.Name()), ReaderIoFactory(DirectIOFactory{}))
require.NoError(t, err)
require.NoError(t, reader.Open())
return reader
Expand Down
2 changes: 1 addition & 1 deletion simpledb/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func backgroundCompaction(db *DB) {
select {
case <-db.compactionTickerStopChannel:
return nil
case _ = <-db.compactionTicker.C:
case <-db.compactionTicker.C:
metadata, err := executeCompaction(db)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion simpledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func CompactionFileThreshold(n int) ExtraOption {
}

// CompactionMaxSizeBytes tells whether an SSTable is considered for compaction.
// SSTables over the given threshold will not be compacted any further. Default is 5GB in DefaultCompactionMaxSizeBytes.
// SSTables over the given threshold will not be compacted any further. Default is 5GB in DefaultCompactionMaxSizeBytes
func CompactionMaxSizeBytes(n uint64) ExtraOption {
return func(args *ExtraOptions) {
args.compactionMaxSizeBytes = n
Expand Down
5 changes: 4 additions & 1 deletion simpledb/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ func (db *DB) replayAndSetupWriteAheadLog() error {
err = executeFlush(db, memStoreFlushAction{
memStore: swapMemstore(db),
})

if err != nil {
log.Printf("Error Flush memstore %v", err)
return err
}
elapsedDuration := time.Since(start)
log.Printf("done replaying WAL in %v with %d records\n", elapsedDuration, numRecords)
}
Expand Down
5 changes: 3 additions & 2 deletions simpledb/rw_memstore_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package simpledb

import (
"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
)

func TestRWMemstoreAddShouldGoToWriteStore(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions simpledb/sstable_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uin
for i := 0; i < len(s.allSSTableReaders); i++ {
reader := s.allSSTableReaders[i]
// avoid the EmptySStableReader (or empty files) and only include small enough SSTables
if reader.MetaData().NumRecords > 0 && reader.MetaData().TotalBytes < compactionMaxSizeBytes {
if reader.MetaData().TotalBytes < compactionMaxSizeBytes {
paths = append(paths, reader.BasePath())
numRecords += reader.MetaData().NumRecords
if i == 0 {
if numRecords == 0 {
canRemoveTombstone = true
}
}
Expand Down
19 changes: 10 additions & 9 deletions simpledb/sstable_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package simpledb

import (
"github.com/stretchr/testify/assert"
sdbProto "github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"github.com/thomasjungblut/go-sstables/sstables/proto"
"os"
"path/filepath"
"reflect"
"sync"
"testing"

"github.com/stretchr/testify/assert"
sdbProto "github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"github.com/thomasjungblut/go-sstables/sstables/proto"
)

func TestSSTableManagerAdditionHappyPath(t *testing.T) {
Expand Down Expand Up @@ -67,10 +68,10 @@ func TestSSTableManagerSelectCompactionCandidates(t *testing.T) {
path: "4",
})

assertCompactionAction(t, 0, []string(nil), manager.candidateTablesForCompaction(25))
assertCompactionAction(t, 5, []string{"2"}, manager.candidateTablesForCompaction(51))
assertCompactionAction(t, 15, []string{"1", "2"}, manager.candidateTablesForCompaction(101))
assertCompactionAction(t, 115, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(1500))
assertCompactionAction(t, 0, []string{"4"}, manager.candidateTablesForCompaction(25))
assertCompactionAction(t, 5, []string{"2", "4"}, manager.candidateTablesForCompaction(51))
assertCompactionAction(t, 15, []string{"1", "2", "4"}, manager.candidateTablesForCompaction(101))
assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(1500))
}

func TestSSTableCompactionReflectionHappyPath(t *testing.T) {
Expand Down

0 comments on commit 68806d7

Please sign in to comment.