From f35fec3eb53d9ea6b2d9b26dad98ac637cb55234 Mon Sep 17 00:00:00 2001 From: loicalleyne Date: Wed, 13 Nov 2024 22:28:54 -0500 Subject: [PATCH] io.Reader record factory --- README.md | 57 +++++++++++++++------- bodkin.go | 44 +++++++++++------ cmd/main.go | 86 ++++++++++++++++++--------------- option.go | 13 ++--- reader/option.go | 41 ++++++++++++++++ reader/reader.go | 123 +++++++++++++++++++++++++++++++++++++++++++++-- 6 files changed, 282 insertions(+), 82 deletions(-) diff --git a/README.md b/README.md index 05c0926..b30b127 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,18 @@ The goal is to provide a useful toolkit to make it easier to use Arrow, and by e Bodkin enables you to use your _data_ to define and evolve your Arrow Schema. ## Features - +### Arrow schema generation from data type inference - Converts a structured input (json string or []byte, Go struct or map[string]any) into an Apache Arrow schema -- Supports nested types + - Supports nested types - Automatically evolves the Arrow schema with new fields when providing new inputs +- Option to merge new infered schema at existing path for composibility - Converts schema field types when unifying schemas to accept evolving input data - Tracks changes to the schema -- Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition -- Custom data loader to load structured data directly to Arrow Records based on inferred schema +- Export/import a serialized Arrow schema to/from file or `[]byte` to transmit or persist schema definition +### Custom data loader +- Load structured data directly to Arrow Records based on inferred schema + - Individual input to Arrow Record + - io.Reader stream to Arrow Records ## 🚀 Install @@ -138,11 +142,41 @@ Also works with nested Go structs and slices // - Age: type=int32, nullable ``` +Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte. +```go +_ = u.ExportSchemaFile("./test.schema") +imp, _ := u.ImportSchemaFile("./test.schema") +fmt.Printf("imported %v\n", imp.String()) + +bs, _ := u.ExportSchemaBytes() +sc, _ := u.ImportSchemaBytes(bs) +fmt.Printf("imported %v\n", sc.String()) +``` + Use a Bodkin Reader to load data to Arrow Records ```go u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) -r, _ := u.NewReader() -rec, _ := r.ReadToRecord([]byte(jsonS1)) +u.Unify(jsonS1) // feed data for schema generation +rdr, _ := u.NewReader() // infered schema in Bodkin used to create Reader +rec, _ := rdr.ReadToRecord([]byte(jsonS1)) // Reader loads data and returns Arrow Record +``` + +Provide a Bodkin Reader with an io.Reader to load many records +```go +import "github.com/loicalleyne/bodkin/reader" +... +u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) +// Create Reader attached to Bodkin ... +u.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024)) +for u.Reader.Next(){ + rec := r.Record() +} +// or create a stand-alone Reader if you have an existing *arrow.Schema +rdr, _ := reader.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024)) +for rdr.Next() { + rec := r.Record() +... +} ``` Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records @@ -159,17 +193,6 @@ for rdr.Next() { // ] ``` -Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte. -```go -_ = u.ExportSchemaFile("./test.schema") -imp, _ := u.ImportSchemaFile("./test.schema") -fmt.Printf("imported %v\n", imp.String()) - -bs, _ := u.ExportSchemaBytes() -sc, _ := u.ImportSchemaBytes(bs) -fmt.Printf("imported %v\n", sc.String()) -``` - ## 💫 Show your support Give a ⭐️ if this project helped you! diff --git a/bodkin.go b/bodkin.go index 68debcd..3d055f0 100644 --- a/bodkin.go +++ b/bodkin.go @@ -26,11 +26,6 @@ type ( config *Bodkin ) -type ( - ReaderOption func(reader.Option) - readerConfig *reader.DataReader -) - // Field represents an element in the input data. type Field struct { Dotpath string `json:"dotpath"` @@ -49,12 +44,13 @@ const ( // Bodkin is a collection of field paths, describing the columns of a structured input(s). type Bodkin struct { rr io.Reader - sf bufio.SplitFunc - sc *bufio.Scanner + br *bufio.Reader + delim byte original *fieldPos old *fieldPos new *fieldPos - r *reader.DataReader + opts []Option + Reader *reader.DataReader knownFields *omap.OrderedMap[string, *fieldPos] untypedFields *omap.OrderedMap[string, *fieldPos] unificationCount int @@ -66,6 +62,8 @@ type Bodkin struct { changes error } +func (u *Bodkin) Opts() []Option { return u.opts } + func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) { schema, err := u.Schema() if err != nil { @@ -74,11 +72,11 @@ func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) { if schema == nil { return nil, fmt.Errorf("nil schema") } - r, err := reader.NewReader(schema, 0, opts...) + u.Reader, err = reader.NewReader(schema, 0, opts...) if err != nil { return nil, err } - return r, nil + return u.Reader, nil } // NewBodkin returns a new Bodkin value from a structured input. @@ -91,6 +89,7 @@ func NewBodkin(opts ...Option) *Bodkin { func newBodkin(opts ...Option) *Bodkin { b := &Bodkin{} + b.opts = opts for _, opt := range opts { opt(b) } @@ -182,7 +181,7 @@ func (u *Bodkin) ExportSchemaFile(exportPath string) error { return err } bs := flight.SerializeSchema(schema, memory.DefaultAllocator) - err = os.WriteFile("./temp.schema", bs, 0644) + err = os.WriteFile(exportPath, bs, 0644) if err != nil { return err } @@ -261,17 +260,23 @@ func (u *Bodkin) UnifyScan() error { } return u.err }() - for u.sc.Scan() { - m, err := reader.InputMap(u.sc.Bytes()) + for { + datumBytes, err := u.br.ReadBytes(u.delim) + if err != nil { + if errors.Is(err, io.EOF) { + u.err = nil + break + } + u.err = err + break + } + m, err := reader.InputMap(datumBytes) if err != nil { u.err = errors.Join(u.err, err) continue } u.Unify(m) } - if err := u.sc.Err(); err != nil { - u.err = errors.Join(u.err, err) - } return u.err } @@ -333,6 +338,8 @@ func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { // Schema returns the current merged Arrow schema generated from the structure/types of // the input(s), and a panic recovery error if the schema could not be created. +// If the Bodkin has a Reader and the schema has been updated since its creation, the Reader +// will replaced with a new one matching the current schema. Any func (u *Bodkin) Schema() (*arrow.Schema, error) { if u.old == nil { return nil, fmt.Errorf("bodkin not initialised") @@ -349,6 +356,11 @@ func (u *Bodkin) Schema() (*arrow.Schema, error) { fields = append(fields, c.field) } s = arrow.NewSchema(fields, nil) + if u.Reader != nil { + if !u.Reader.Schema().Equal(s) { + u.Reader, _ = reader.NewReader(s, 0, u.Reader.Opts()...) + } + } return s, nil } diff --git a/cmd/main.go b/cmd/main.go index 79b8bc2..56a1730 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,62 +8,68 @@ import ( "time" "github.com/loicalleyne/bodkin" + "github.com/loicalleyne/bodkin/reader" ) func main() { start := time.Now() - filepath := "github.json" + filepath := "large-file.json" log.Println("start") - f, err := os.Open(filepath) - if err != nil { - panic(err) - } - defer f.Close() - s := bufio.NewScanner(f) - u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) - if err != nil { - panic(err) - } - - for s.Scan() { - err = u.Unify(s.Bytes()) + var u *bodkin.Bodkin + if 1 == 1 { + f, err := os.Open(filepath) + if err != nil { + panic(err) + } + defer f.Close() + s := bufio.NewScanner(f) + u = bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) if err != nil { panic(err) } - } - f.Close() - schema, err := u.Schema() - if err != nil { - panic(err) - } - log.Printf("union %v\n", schema.String()) - log.Printf("elapsed: %v\n", time.Since(start)) - ff, err := os.Open(filepath) - if err != nil { - panic(err) - } - defer ff.Close() - r, err := u.NewReader() - if err != nil { - panic(err) + for s.Scan() { + err = u.Unify(s.Bytes()) + if err != nil { + panic(err) + } + } + f.Close() + err = u.ExportSchemaFile("temp.bak") + if err != nil { + panic(err) + } } - i := 0 - s = bufio.NewScanner(ff) - for s.Scan() { - rec, err := r.ReadToRecord(s.Bytes()) + if 1 == 1 { + schema, err := u.ImportSchemaFile("temp.bak") if err != nil { panic(err) } - _, err = rec.MarshalJSON() + ff, err := os.Open(filepath) if err != nil { - fmt.Printf("error marshaling record: %v\n", err) + panic(err) } - // fmt.Printf("\nmarshaled record :\n%v\n", string(rj)) - i++ - } + defer ff.Close() + r, err := reader.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024*16)) + if err != nil { + panic(err) + } + + log.Printf("union %v\n", schema.String()) + log.Printf("elapsed: %v\n", time.Since(start)) - log.Println("records", i) + i := 0 + for r.Next() { + rec := r.Record() + _, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + // fmt.Printf("\nmarshaled record :\n%v\n", string(rj)) + i++ + } + log.Println("records", r.Count(), i) + } log.Printf("elapsed: %v\n", time.Since(start)) log.Println("end") } diff --git a/option.go b/option.go index 3e7c91a..8fce0a5 100644 --- a/option.go +++ b/option.go @@ -45,14 +45,15 @@ func WithMaxCount(i int) Option { } } -// WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan(). -// A bufio.SplitFunc can optionally be provided, otherwise the default -// ScanLines will be used. -func WithIOReader(r io.Reader, sf bufio.SplitFunc) Option { +// WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan(), along +// with a delimiter to use to split datum in the data stream. +// Default delimiter '\n' if delimiter is not provided. +func WithIOReader(r io.Reader, delim byte) Option { return func(cfg config) { cfg.rr = r - if sf != nil { - cfg.sf = sf + cfg.br = bufio.NewReaderSize(cfg.rr, 1024*16) + if delim != '\n' { + cfg.delim = delim } } } diff --git a/reader/option.go b/reader/option.go index 300e9ac..088a8f2 100644 --- a/reader/option.go +++ b/reader/option.go @@ -1,6 +1,9 @@ package reader import ( + "bufio" + "io" + "github.com/apache/arrow-go/v18/arrow/memory" ) @@ -19,3 +22,41 @@ func WithJSONDecoder() Option { cfg.jsonDecode = true } } + +// WithChunk specifies the chunk size used while reading data to Arrow records. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +func WithChunk(n int) Option { + return func(cfg config) { + cfg.chunk = n + } +} + +// WithIOReader provides an io.Reader to Bodkin Reader, along with a delimiter +// to use to split datum in the data stream. Default delimiter '\n' if delimiter +// is not provided. +func WithIOReader(r io.Reader, delim byte) Option { + return func(cfg config) { + cfg.rr = r + cfg.br = bufio.NewReaderSize(cfg.rr, 1024*1024*16) + if delim != DefaultDelimiter { + cfg.delim = delim + } + } +} + +// WithInputBufferSize specifies the Bodkin Reader's input buffer size. +func WithInputBufferSize(n int) Option { + return func(cfg config) { + cfg.inputBufferSize = n + } +} + +// WithRecordBufferSize specifies the Bodkin Reader's record buffer size. +func WithRecordBufferSize(n int) Option { + return func(cfg config) { + cfg.recordBufferSize = n + } +} diff --git a/reader/reader.go b/reader/reader.go index 58434d7..c437f23 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "sync" "sync/atomic" "github.com/apache/arrow-go/v18/arrow" @@ -23,6 +24,11 @@ const ( DataSourceJSON DataSourceAvro ) +const ( + Manual int = iota + Scanner +) +const DefaultDelimiter byte = byte('\n') // Option configures an Avro reader/writer. type ( @@ -32,13 +38,14 @@ type ( type DataReader struct { rr io.Reader - sf bufio.SplitFunc - sc *bufio.Scanner + br *bufio.Reader + delim byte refs int64 source DataSource schema *arrow.Schema bld *array.RecordBuilder mem memory.Allocator + opts []Option bldMap *fieldPos ldr *dataLoader cur arrow.Record @@ -49,11 +56,14 @@ type DataReader struct { recChan chan arrow.Record recReq chan struct{} bldDone chan struct{} + inputLock atomic.Int32 + factoryLock atomic.Int32 + wg sync.WaitGroup jsonDecode bool chunk int + inputCount int inputBufferSize int recordBufferSize int - countInput int } func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataReader, error) { @@ -70,6 +80,8 @@ func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataRe inputBufferSize: 1024 * 64, recordBufferSize: 1024 * 64, chunk: 0, + delim: DefaultDelimiter, + opts: opts, } for _, opt := range opts { opt(r) @@ -81,6 +93,10 @@ func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataRe r.recReq = make(chan struct{}, 100) r.readerCtx, r.readCancel = context.WithCancel(context.Background()) + if r.rr != nil { + r.wg.Add(1) + go r.decode2Chan() + } r.bld = array.NewRecordBuilder(memory.DefaultAllocator, schema) r.bldMap = newFieldPos() r.bldMap.isStruct = true @@ -91,6 +107,7 @@ func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataRe } r.ldr.drawTree(r.bldMap) go r.recordFactory() + r.wg.Add(1) return r, nil } @@ -130,6 +147,55 @@ func (r *DataReader) ReadToRecord(a any) (arrow.Record, error) { return r.bld.NewRecord(), nil } +// Next returns whether a Record can be received from the converted record queue. +// The user should check Err() after a call to Next that returned false to check +// if an error took place. +func (r *DataReader) Next() bool { + var ok bool + if r.cur != nil { + r.cur.Release() + r.cur = nil + } + + r.wg.Wait() + select { + case r.cur, ok = <-r.recChan: + if !ok && r.cur == nil { + return false + } + case <-r.bldDone: + if len(r.recChan) > 0 { + r.cur = <-r.recChan + } + case <-r.readerCtx.Done(): + return false + } + if r.err != nil { + return false + } + + return r.cur != nil +} + +func (r *DataReader) Mode() int { + switch r.rr { + case nil: + return Manual + default: + return Scanner + } +} + +func (r *DataReader) Count() int { return r.inputCount } +func (r *DataReader) ResetCount() { r.inputCount = 0 } +func (r *DataReader) InputBufferSize() int { return r.inputBufferSize } +func (r *DataReader) RecBufferSize() int { return r.recordBufferSize } +func (r *DataReader) DataSource() DataSource { return r.source } +func (r *DataReader) Opts() []Option { return r.opts } + +// Record returns the current Arrow record. +// It is valid until the next call to Next. +func (r *DataReader) Record() arrow.Record { return r.cur } func (r *DataReader) Schema() *arrow.Schema { return r.schema } // Err returns the last error encountered during the reading of data. @@ -153,3 +219,54 @@ func (r *DataReader) Release() { } } } + +// Peek returns the length of the input data and Arrow Record queues. +func (r *DataReader) Peek() (int, int) { + return len(r.anyChan), len(r.recChan) +} + +// Cancel cancels the Reader's io.Reader scan to Arrow. +func (r *DataReader) Cencel() { + r.readCancel() +} + +// Read loads one datum. +// If the Reader has an io.Reader, Read is a no-op. +func (r *DataReader) Read(a any) error { + if r.rr != nil { + return nil + } + var err error + defer func() error { + if rc := recover(); rc != nil { + r.err = errors.Join(r.err, fmt.Errorf("panic %v", rc)) + } + return r.err + }() + m, err := InputMap(a) + if err != nil { + r.err = errors.Join(r.err, err) + return err + } + r.anyChan <- m + r.inputCount++ + return nil +} + +// Reset resets a Reader to its initial state. +func (r *DataReader) Reset() { + r.readCancel() + r.anyChan = make(chan any, r.inputBufferSize) + r.recChan = make(chan arrow.Record, r.recordBufferSize) + r.bldDone = make(chan struct{}) + r.inputCount = 0 + + // DataReader has an io.Reader + if r.rr != nil { + r.br.Reset(r.rr) + go r.decode2Chan() + r.wg.Add(1) + } + go r.recordFactory() + r.wg.Add(1) +}