Skip to content

Commit

Permalink
io.Reader record factory
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Nov 14, 2024
1 parent f9a3fd4 commit f35fec3
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 82 deletions.
57 changes: 40 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ The goal is to provide a useful toolkit to make it easier to use Arrow, and by e
Bodkin enables you to use your _data_ to define and evolve your Arrow Schema.

## Features

### Arrow schema generation from data type inference
- Converts a structured input (json string or []byte, Go struct or map[string]any) into an Apache Arrow schema
- Supports nested types
- Supports nested types
- Automatically evolves the Arrow schema with new fields when providing new inputs
- Option to merge new infered schema at existing path for composibility
- Converts schema field types when unifying schemas to accept evolving input data
- Tracks changes to the schema
- Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition
- Custom data loader to load structured data directly to Arrow Records based on inferred schema
- Export/import a serialized Arrow schema to/from file or `[]byte` to transmit or persist schema definition
### Custom data loader
- Load structured data directly to Arrow Records based on inferred schema
- Individual input to Arrow Record
- io.Reader stream to Arrow Records

## 🚀 Install

Expand Down Expand Up @@ -138,11 +142,41 @@ Also works with nested Go structs and slices
// - Age: type=int32, nullable
```

Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte.
```go
_ = u.ExportSchemaFile("./test.schema")
imp, _ := u.ImportSchemaFile("./test.schema")
fmt.Printf("imported %v\n", imp.String())

bs, _ := u.ExportSchemaBytes()
sc, _ := u.ImportSchemaBytes(bs)
fmt.Printf("imported %v\n", sc.String())
```

Use a Bodkin Reader to load data to Arrow Records
```go
u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
r, _ := u.NewReader()
rec, _ := r.ReadToRecord([]byte(jsonS1))
u.Unify(jsonS1) // feed data for schema generation
rdr, _ := u.NewReader() // infered schema in Bodkin used to create Reader
rec, _ := rdr.ReadToRecord([]byte(jsonS1)) // Reader loads data and returns Arrow Record
```

Provide a Bodkin Reader with an io.Reader to load many records
```go
import "github.com/loicalleyne/bodkin/reader"
...
u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
// Create Reader attached to Bodkin ...
u.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024))
for u.Reader.Next(){
rec := r.Record()
}
// or create a stand-alone Reader if you have an existing *arrow.Schema
rdr, _ := reader.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024))
for rdr.Next() {
rec := r.Record()
...
}
```

Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records
Expand All @@ -159,17 +193,6 @@ for rdr.Next() {
// ]
```

Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte.
```go
_ = u.ExportSchemaFile("./test.schema")
imp, _ := u.ImportSchemaFile("./test.schema")
fmt.Printf("imported %v\n", imp.String())

bs, _ := u.ExportSchemaBytes()
sc, _ := u.ImportSchemaBytes(bs)
fmt.Printf("imported %v\n", sc.String())
```

## 💫 Show your support

Give a ⭐️ if this project helped you!
Expand Down
44 changes: 28 additions & 16 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ type (
config *Bodkin
)

type (
ReaderOption func(reader.Option)
readerConfig *reader.DataReader
)

// Field represents an element in the input data.
type Field struct {
Dotpath string `json:"dotpath"`
Expand All @@ -49,12 +44,13 @@ const (
// Bodkin is a collection of field paths, describing the columns of a structured input(s).
type Bodkin struct {
rr io.Reader
sf bufio.SplitFunc
sc *bufio.Scanner
br *bufio.Reader
delim byte
original *fieldPos
old *fieldPos
new *fieldPos
r *reader.DataReader
opts []Option
Reader *reader.DataReader
knownFields *omap.OrderedMap[string, *fieldPos]
untypedFields *omap.OrderedMap[string, *fieldPos]
unificationCount int
Expand All @@ -66,6 +62,8 @@ type Bodkin struct {
changes error
}

func (u *Bodkin) Opts() []Option { return u.opts }

func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) {
schema, err := u.Schema()
if err != nil {
Expand All @@ -74,11 +72,11 @@ func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) {
if schema == nil {
return nil, fmt.Errorf("nil schema")
}
r, err := reader.NewReader(schema, 0, opts...)
u.Reader, err = reader.NewReader(schema, 0, opts...)
if err != nil {
return nil, err
}
return r, nil
return u.Reader, nil
}

// NewBodkin returns a new Bodkin value from a structured input.
Expand All @@ -91,6 +89,7 @@ func NewBodkin(opts ...Option) *Bodkin {

func newBodkin(opts ...Option) *Bodkin {
b := &Bodkin{}
b.opts = opts
for _, opt := range opts {
opt(b)
}
Expand Down Expand Up @@ -182,7 +181,7 @@ func (u *Bodkin) ExportSchemaFile(exportPath string) error {
return err
}
bs := flight.SerializeSchema(schema, memory.DefaultAllocator)
err = os.WriteFile("./temp.schema", bs, 0644)
err = os.WriteFile(exportPath, bs, 0644)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,17 +260,23 @@ func (u *Bodkin) UnifyScan() error {
}
return u.err
}()
for u.sc.Scan() {
m, err := reader.InputMap(u.sc.Bytes())
for {
datumBytes, err := u.br.ReadBytes(u.delim)
if err != nil {
if errors.Is(err, io.EOF) {
u.err = nil
break
}
u.err = err
break
}
m, err := reader.InputMap(datumBytes)
if err != nil {
u.err = errors.Join(u.err, err)
continue
}
u.Unify(m)
}
if err := u.sc.Err(); err != nil {
u.err = errors.Join(u.err, err)
}
return u.err
}

Expand Down Expand Up @@ -333,6 +338,8 @@ func (u *Bodkin) OriginSchema() (*arrow.Schema, error) {

// Schema returns the current merged Arrow schema generated from the structure/types of
// the input(s), and a panic recovery error if the schema could not be created.
// If the Bodkin has a Reader and the schema has been updated since its creation, the Reader
// will replaced with a new one matching the current schema. Any
func (u *Bodkin) Schema() (*arrow.Schema, error) {
if u.old == nil {
return nil, fmt.Errorf("bodkin not initialised")
Expand All @@ -349,6 +356,11 @@ func (u *Bodkin) Schema() (*arrow.Schema, error) {
fields = append(fields, c.field)
}
s = arrow.NewSchema(fields, nil)
if u.Reader != nil {
if !u.Reader.Schema().Equal(s) {
u.Reader, _ = reader.NewReader(s, 0, u.Reader.Opts()...)
}
}
return s, nil
}

Expand Down
86 changes: 46 additions & 40 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,68 @@ import (
"time"

"github.com/loicalleyne/bodkin"
"github.com/loicalleyne/bodkin/reader"
)

func main() {
start := time.Now()
filepath := "github.json"
filepath := "large-file.json"
log.Println("start")
f, err := os.Open(filepath)
if err != nil {
panic(err)
}
defer f.Close()
s := bufio.NewScanner(f)
u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
if err != nil {
panic(err)
}

for s.Scan() {
err = u.Unify(s.Bytes())
var u *bodkin.Bodkin
if 1 == 1 {
f, err := os.Open(filepath)
if err != nil {
panic(err)
}
defer f.Close()
s := bufio.NewScanner(f)
u = bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
if err != nil {
panic(err)
}
}
f.Close()
schema, err := u.Schema()
if err != nil {
panic(err)
}
log.Printf("union %v\n", schema.String())
log.Printf("elapsed: %v\n", time.Since(start))

ff, err := os.Open(filepath)
if err != nil {
panic(err)
}
defer ff.Close()
r, err := u.NewReader()
if err != nil {
panic(err)
for s.Scan() {
err = u.Unify(s.Bytes())
if err != nil {
panic(err)
}
}
f.Close()
err = u.ExportSchemaFile("temp.bak")
if err != nil {
panic(err)
}
}
i := 0
s = bufio.NewScanner(ff)
for s.Scan() {
rec, err := r.ReadToRecord(s.Bytes())
if 1 == 1 {
schema, err := u.ImportSchemaFile("temp.bak")
if err != nil {
panic(err)
}
_, err = rec.MarshalJSON()
ff, err := os.Open(filepath)
if err != nil {
fmt.Printf("error marshaling record: %v\n", err)
panic(err)
}
// fmt.Printf("\nmarshaled record :\n%v\n", string(rj))
i++
}
defer ff.Close()
r, err := reader.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024*16))
if err != nil {
panic(err)
}

log.Printf("union %v\n", schema.String())
log.Printf("elapsed: %v\n", time.Since(start))

log.Println("records", i)
i := 0
for r.Next() {
rec := r.Record()
_, err := rec.MarshalJSON()
if err != nil {
fmt.Printf("error marshaling record: %v\n", err)
}
// fmt.Printf("\nmarshaled record :\n%v\n", string(rj))
i++
}
log.Println("records", r.Count(), i)
}
log.Printf("elapsed: %v\n", time.Since(start))
log.Println("end")
}
Expand Down
13 changes: 7 additions & 6 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ func WithMaxCount(i int) Option {
}
}

// WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan().
// A bufio.SplitFunc can optionally be provided, otherwise the default
// ScanLines will be used.
func WithIOReader(r io.Reader, sf bufio.SplitFunc) Option {
// WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan(), along
// with a delimiter to use to split datum in the data stream.
// Default delimiter '\n' if delimiter is not provided.
func WithIOReader(r io.Reader, delim byte) Option {
return func(cfg config) {
cfg.rr = r
if sf != nil {
cfg.sf = sf
cfg.br = bufio.NewReaderSize(cfg.rr, 1024*16)
if delim != '\n' {
cfg.delim = delim
}
}
}
41 changes: 41 additions & 0 deletions reader/option.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package reader

import (
"bufio"
"io"

"github.com/apache/arrow-go/v18/arrow/memory"
)

Expand All @@ -19,3 +22,41 @@ func WithJSONDecoder() Option {
cfg.jsonDecode = true
}
}

// WithChunk specifies the chunk size used while reading data to Arrow records.
//
// If n is zero or 1, no chunking will take place and the reader will create
// one record per row.
// If n is greater than 1, chunks of n rows will be read.
func WithChunk(n int) Option {
return func(cfg config) {
cfg.chunk = n
}
}

// WithIOReader provides an io.Reader to Bodkin Reader, along with a delimiter
// to use to split datum in the data stream. Default delimiter '\n' if delimiter
// is not provided.
func WithIOReader(r io.Reader, delim byte) Option {
return func(cfg config) {
cfg.rr = r
cfg.br = bufio.NewReaderSize(cfg.rr, 1024*1024*16)
if delim != DefaultDelimiter {
cfg.delim = delim
}
}
}

// WithInputBufferSize specifies the Bodkin Reader's input buffer size.
func WithInputBufferSize(n int) Option {
return func(cfg config) {
cfg.inputBufferSize = n
}
}

// WithRecordBufferSize specifies the Bodkin Reader's record buffer size.
func WithRecordBufferSize(n int) Option {
return func(cfg config) {
cfg.recordBufferSize = n
}
}
Loading

0 comments on commit f35fec3

Please sign in to comment.