Skip to content

Commit

Permalink
Merge pull request #2 from loicalleyne/reader
Browse files Browse the repository at this point in the history
Bodkin Reader
  • Loading branch information
loicalleyne authored Nov 13, 2024
2 parents 2fb48fd + 41725a5 commit f61cddd
Show file tree
Hide file tree
Showing 18 changed files with 1,398 additions and 352 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ go.work.sum
# env file
.env

internal
avro
experiments
map.go
*.schema
*.pgo
*.pgo
debug
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Bodkin enables you to use your _data_ to define and evolve your Arrow Schema.
- Converts schema field types when unifying schemas to accept evolving input data
- Tracks changes to the schema
- Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition
- Custom data loader to load structured data directly to Arrow Records based on inferred schema

## 🚀 Install

Expand All @@ -33,7 +34,7 @@ You can import `bodkin` using:
import "github.com/loicalleyne/bodkin"
```

Create a new Bodkin, providing some structured data and print out the resulting Arrow Schema's string representation and field evaluation errors
Create a new Bodkin, provide some structured data and print out the resulting Arrow Schema's string representation and any field evaluation errors
```go
var jsonS1 string = `{
"count": 89,
Expand All @@ -44,7 +45,8 @@ var jsonS1 string = `{
"datefield":"1979-01-01",
"timefield":"01:02:03"
}`
u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
u, _ := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
u.Unify(jsonS1)
s, _ := u.OriginSchema()
fmt.Printf("original input %v\n", s.String())
for _, e := range u.Err() {
Expand Down Expand Up @@ -100,7 +102,7 @@ fmt.Println(u.Changes())
// changed $timefield : from time64[ns] to utf8
```

Also works with Go structs
Also works with nested Go structs and slices
```go
stu := Student{
Name: "StudentName",
Expand Down Expand Up @@ -136,6 +138,13 @@ Also works with Go structs
// - Age: type=int32, nullable
```

Use a Bodkin Reader to load data to Arrow Records
```go
u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
r, _ := u.NewReader()
rec, _ := r.ReadToRecord([]byte(jsonS1))
```

Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records
```go
rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema)
Expand Down Expand Up @@ -164,6 +173,7 @@ fmt.Printf("imported %v\n", sc.String())
## 💫 Show your support

Give a ⭐️ if this project helped you!
Feedback and PRs welcome.

## License

Expand Down
121 changes: 92 additions & 29 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package bodkin

import (
"bufio"
"errors"
"fmt"
"io"
"math"
"os"
"slices"
Expand All @@ -24,6 +26,11 @@ type (
config *Bodkin
)

type (
ReaderOption func(reader.Option)
readerConfig *reader.DataReader
)

// Field represents an element in the input data.
type Field struct {
Dotpath string `json:"dotpath"`
Expand All @@ -41,33 +48,48 @@ const (

// Bodkin is a collection of field paths, describing the columns of a structured input(s).
type Bodkin struct {
rr io.Reader
sf bufio.SplitFunc
sc *bufio.Scanner
original *fieldPos
old *fieldPos
new *fieldPos
r *reader.DataReader
knownFields *omap.OrderedMap[string, *fieldPos]
untypedFields *omap.OrderedMap[string, *fieldPos]
unificationCount int64
maxCount int64
unificationCount int
maxCount int
inferTimeUnits bool
quotedValuesAreStrings bool
typeConversion bool
err error
changes error
}

func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) {
schema, err := u.Schema()
if err != nil {
return nil, err
}
if schema == nil {
return nil, fmt.Errorf("nil schema")
}
r, err := reader.NewReader(schema, 0, opts...)
if err != nil {
return nil, err
}
return r, nil
}

// NewBodkin returns a new Bodkin value from a structured input.
// Input must be a json byte slice or string, a Go struct with exported fields or map[string]any.
// Any uppopulated fields, empty objects or empty slices in JSON or map[string]any inputs are skipped as their
// types cannot be evaluated and converted.
func NewBodkin(a any, opts ...Option) (*Bodkin, error) {
m, err := reader.InputMap(a)
if err != nil {
return nil, err
}
return newBodkin(m, opts...)
func NewBodkin(opts ...Option) *Bodkin {
return newBodkin(opts...)
}

func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) {
func newBodkin(opts ...Option) *Bodkin {
b := &Bodkin{}
for _, opt := range opts {
opt(b)
Expand All @@ -76,17 +98,8 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) {
// Ordered map of known fields, keys are field dotpaths.
b.knownFields = omap.New[string, *fieldPos]()
b.untypedFields = omap.New[string, *fieldPos]()
// Keep an immutable copy of the initial evaluation.
g := newFieldPos(b)
mapToArrow(g, m)
b.original = g
_, err := b.OriginSchema()
// Identical to above except this one can be mutated with Unify.
f := newFieldPos(b)
mapToArrow(f, m)
b.old = f
b.maxCount = int64(math.MaxInt64)
return b, err
b.maxCount = math.MaxInt
return b
}

// Returns count of evaluated field paths.
Expand Down Expand Up @@ -124,22 +137,22 @@ func (u *Bodkin) Err() []Field {
func (u *Bodkin) Changes() error { return u.changes }

// Count returns the number of datum evaluated for schema to date.
func (u *Bodkin) Count() int64 { return u.unificationCount }
func (u *Bodkin) Count() int { return u.unificationCount }

// MaxCount returns the maximum number of datum to be evaluated for schema.
func (u *Bodkin) MaxCount() int64 { return u.unificationCount }
func (u *Bodkin) MaxCount() int { return u.unificationCount }

// ResetCount resets the count of datum evaluated for schema to date.
func (u *Bodkin) ResetCount() int64 {
func (u *Bodkin) ResetCount() int {
u.unificationCount = 0
return u.unificationCount
}

// ResetMaxCount resets the maximum number of datam to be evaluated for schema
// to maxInt64.
// ResetCount resets the count of datum evaluated for schema to date.
func (u *Bodkin) ResetMaxCount() int64 {
u.maxCount = int64(math.MaxInt64)
func (u *Bodkin) ResetMaxCount() int {
u.maxCount = math.MaxInt
return u.unificationCount
}

Expand Down Expand Up @@ -210,7 +223,17 @@ func (u *Bodkin) Unify(a any) error {
u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err)
return fmt.Errorf("%v : %v", ErrInvalidInput, err)
}

if u.old == nil {
// Keep an immutable copy of the initial evaluation.
g := newFieldPos(u)
mapToArrow(g, m)
u.original = g
// Identical to above except this one can be mutated with Unify.
f := newFieldPos(u)
mapToArrow(f, m)
u.old = f
return nil
}
f := newFieldPos(u)
mapToArrow(f, m)
u.new = f
Expand All @@ -221,11 +244,45 @@ func (u *Bodkin) Unify(a any) error {
return nil
}

// UnifyScan reads from a provided io.Reader and merges each datum's structured input's column definition
// with the previously input's schema. Any uppopulated fields, empty objects or empty slices
// in JSON input are skipped.
func (u *Bodkin) UnifyScan() error {
var err error
if u.rr == nil {
return fmt.Errorf("no io.reader provided")
}
if u.unificationCount > u.maxCount {
return fmt.Errorf("maxcount exceeded")
}
defer func() error {
if rc := recover(); rc != nil {
u.err = errors.Join(u.err, err, fmt.Errorf("panic %v", rc))
}
return u.err
}()
for u.sc.Scan() {
m, err := reader.InputMap(u.sc.Bytes())
if err != nil {
u.err = errors.Join(u.err, err)
continue
}
u.Unify(m)
}
if err := u.sc.Err(); err != nil {
u.err = errors.Join(u.err, err)
}
return u.err
}

// Unify merges structured input's column definition with the previously input's schema,
// using a specified valid path as the root. An error is returned if the mergeAt path is
// not found.
// Any uppopulated fields, empty objects or empty slices in JSON input are skipped.
func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error {
if u.old == nil {
return fmt.Errorf("bodkin not initialised")
}
if u.unificationCount > u.maxCount {
return fmt.Errorf("maxcount exceeded")
}
Expand Down Expand Up @@ -256,6 +313,9 @@ func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error {
// Schema returns the original Arrow schema generated from the structure/types of
// the initial input, and a panic recovery error if the schema could not be created.
func (u *Bodkin) OriginSchema() (*arrow.Schema, error) {
if u.old == nil {
return nil, fmt.Errorf("bodkin not initialised")
}
var s *arrow.Schema
defer func(s *arrow.Schema) (*arrow.Schema, error) {
if pErr := recover(); pErr != nil {
Expand All @@ -274,6 +334,9 @@ func (u *Bodkin) OriginSchema() (*arrow.Schema, error) {
// Schema returns the current merged Arrow schema generated from the structure/types of
// the input(s), and a panic recovery error if the schema could not be created.
func (u *Bodkin) Schema() (*arrow.Schema, error) {
if u.old == nil {
return nil, fmt.Errorf("bodkin not initialised")
}
var s *arrow.Schema
defer func(s *arrow.Schema) (*arrow.Schema, error) {
if pErr := recover(); pErr != nil {
Expand All @@ -294,16 +357,16 @@ func (u *Bodkin) Schema() (*arrow.Schema, error) {
// ErrNoLatestSchema if Unify() has never been called. A panic recovery error is returned
// if the schema could not be created.
func (u *Bodkin) LastSchema() (*arrow.Schema, error) {
if u.new == nil {
return nil, ErrNoLatestSchema
}
var s *arrow.Schema
defer func(s *arrow.Schema) (*arrow.Schema, error) {
if pErr := recover(); pErr != nil {
return nil, fmt.Errorf("schema problem: %v", pErr)
}
return s, nil
}(s)
if u.new == nil {
return nil, ErrNoLatestSchema
}
var fields []arrow.Field
for _, c := range u.new.children {
fields = append(fields, c.field)
Expand Down
8 changes: 7 additions & 1 deletion cmd/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
*.json
*.json
<<<<<<< Updated upstream
*.bak
main?.go
=======
*.bak
>>>>>>> Stashed changes
Loading

0 comments on commit f61cddd

Please sign in to comment.