From fa513a1e991c14b3509244058de2e754f9f97259 Mon Sep 17 00:00:00 2001 From: loicalleyne Date: Tue, 5 Nov 2024 15:23:31 -0500 Subject: [PATCH] change API, add functions returning real errors functions to see detected paths, unknown type paths merge/upgradeType logic improvements --- bodkin.go | 309 +++++++++++++++++++++++++---------- cmd/main.go | 127 ++++++++++++-- json2parquet/json2parquet.go | 20 +-- option.go | 34 ++++ schema.go | 133 +++++---------- types.go | 195 ++++++++++++++++++++++ 6 files changed, 609 insertions(+), 209 deletions(-) create mode 100644 option.go create mode 100644 types.go diff --git a/bodkin.go b/bodkin.go index 2f0039e..eb3a577 100644 --- a/bodkin.go +++ b/bodkin.go @@ -1,15 +1,18 @@ // Package bodkin is a Go library for generating schemas and decoding generic map values and native Go structures to Apache Arrow. -// The goal is to provide a useful toolkit to make it easier to use Arrow, and by extension Parquet. +// The goal is to provide a useful toolkit to make it easier to use Arrow, and by extension Parquet with data whose shape +// is evolving or not strictly defined. package bodkin import ( + "bytes" "errors" "fmt" + "slices" "strings" "github.com/apache/arrow-go/v18/arrow" "github.com/go-viper/mapstructure/v2" - "github.com/goccy/go-json" + json "github.com/goccy/go-json" omap "github.com/wk8/go-ordered-map/v2" ) @@ -19,6 +22,21 @@ type ( config *Bodkin ) +// Field represents an element in the input data. +type Field struct { + Dotpath string `json:"dotpath"` + Type arrow.Type `json:"arrow_type"` + // Number of child fields if a nested type + Childen int `json:"children,omitempty"` + // Evaluation failure reason + Issue error `json:"issue,omitempty"` +} + +const ( + unknown int = 0 + known int = 1 +) + // Bodkin is a collection of field paths, describing the columns of a structured input(s). type Bodkin struct { original *fieldPos @@ -59,13 +77,13 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { g := newFieldPos(b) mapToArrow(g, m) b.original = g - + _, err := b.OriginSchema() // Identical to above except this one can be mutated with Unify. f := newFieldPos(b) mapToArrow(f, m) b.old = f - return b, errWrap(f) + return b, err } // InputMap takes structured input data and attempts to decode it to @@ -80,12 +98,18 @@ func InputMap(a any) (map[string]any, error) { case map[string]any: return input, nil case []byte: - err := json.Unmarshal(input, &m) + r := bytes.NewReader(input) + d := json.NewDecoder(r) + d.UseNumber() + err := d.Decode(&m) if err != nil { return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err) } case string: - err := json.Unmarshal([]byte(input), &m) + r := bytes.NewReader([]byte(input)) + d := json.NewDecoder(r) + d.UseNumber() + err := d.Decode(&m) if err != nil { return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err) } @@ -98,8 +122,35 @@ func InputMap(a any) (map[string]any, error) { return m, nil } -// Err returns the last errors encountered during the unification of input schemas. -func (u *Bodkin) Err() error { return u.err } +// Returns count of evaluated field paths. +func (u *Bodkin) CountPaths() int { + return u.knownFields.Len() +} + +// Returns count of unevaluated field paths. +func (u *Bodkin) CountPending() int { + return u.untypedFields.Len() +} + +// Err returns a []Field that could not be evaluated to date. +func (u *Bodkin) Err() []Field { + fp := u.sortMapKeysDesc(unknown) + var paths []Field = make([]Field, len(fp)) + for i, p := range fp { + f, _ := u.untypedFields.Get(p) + d := Field{Dotpath: f.dotPath(), Type: f.arrowType} + switch f.arrowType { + case arrow.STRUCT: + d.Issue = fmt.Errorf("struct : %vs", ErrUndefinedFieldType) + case arrow.LIST: + d.Issue = fmt.Errorf("list : %v", ErrUndefinedArrayElementType) + default: + d.Issue = fmt.Errorf("%w", ErrUndefinedFieldType) + } + paths[i] = d + } + return paths +} // Changes returns a list of field additions and field type conversions done // in the lifetime of the Bodkin object. @@ -108,110 +159,120 @@ func (u *Bodkin) Changes() error { return u.changes } // Count returns the number of datum evaluated for schema to date. func (u *Bodkin) Count() int { return u.unificationCount } -// WithInferTimeUnits() enables scanning input string values for time, date and timestamp types. -// -// Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot -// exceed the precision allowed by the time unit, otherwise unmarshalling will error. -// -// Dates use YYYY-MM-DD format. -// -// Timestamps use RFC3339Nano format except without a timezone, all of the following are valid: -// -// YYYY-MM-DD -// YYYY-MM-DD[T]HH -// YYYY-MM-DD[T]HH:MM -// YYYY-MM-DD[T]HH:MM:SS[.zzzzzzzzzz] -func WithInferTimeUnits() Option { - return func(cfg config) { - cfg.inferTimeUnits = true +// Paths returns a slice of dotpaths of fields successfully evaluated to date. +func (u *Bodkin) Paths() []Field { + fp := u.sortMapKeysDesc(known) + var paths []Field = make([]Field, len(fp)) + for i, p := range fp { + f, ok := u.knownFields.Get(p) + if !ok { + continue + } + d := Field{Dotpath: f.dotPath(), Type: f.arrowType} + switch f.arrowType { + case arrow.STRUCT: + d.Childen = len(f.children) + } + paths[i] = d } + return paths } -// WithTypeConversion enables upgrading the column types to fix compatibilty conflicts. -func WithTypeConversion() Option { - return func(cfg config) { - cfg.typeConversion = true +// Unify merges structured input's column definition with the previously input's schema. +// Any uppopulated fields, empty objects or empty slices in JSON input are skipped. +func (u *Bodkin) Unify(a any) { + m, err := InputMap(a) + if err != nil { + u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) + return } -} -// WithTypeConversion enables upgrading the column types to fix compatibilty conflicts. -func WithQuotedValuesAreStrings() Option { - return func(cfg config) { - cfg.quotedValuesAreStrings = true + f := newFieldPos(u) + mapToArrow(f, m) + u.new = f + for _, field := range u.new.children { + u.merge(field, nil) } + u.unificationCount++ } -// Unify merges structured input's column definition with the previously input's schema. +// Unify merges structured input's column definition with the previously input's schema, +// using a specified valid path as the root. An error is returned if the mergeAt path is +// not found. // Any uppopulated fields, empty objects or empty slices in JSON input are skipped. -func (u *Bodkin) Unify(a any) { - m := map[string]interface{}{} - switch input := a.(type) { - case nil: - u.err = ErrUndefinedInput - case []byte: - err := json.Unmarshal(input, &m) - if err != nil { - u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) - return - } - case string: - err := json.Unmarshal([]byte(input), &m) - if err != nil { - u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) - return - } - case map[string]any: - f := newFieldPos(u) - mapToArrow(f, m) - u.new = f - for _, field := range u.new.children { - u.merge(field) - } - default: - err := mapstructure.Decode(a, &m) - if err != nil { - u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) - return - } +func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error { + mergePath := make([]string, 0) + if !(len(mergeAt) == 0 || mergeAt == "$") { + mergePath = strings.Split(strings.TrimPrefix(mergeAt, "$"), ".") + } + if _, ok := u.knownFields.Get(mergeAt); !ok { + return fmt.Errorf("unitfyatpath %s : %v", mergeAt, ErrPathNotFound) } + + m, err := InputMap(a) + if err != nil { + u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) + return fmt.Errorf("%v : %v", ErrInvalidInput, err) + } + f := newFieldPos(u) mapToArrow(f, m) u.new = f for _, field := range u.new.children { - u.merge(field) + u.merge(field, mergePath) } u.unificationCount++ + return nil } // Schema returns the original Arrow schema generated from the structure/types of -// the initial input, and wrapped errors indicating which fields could not be evaluated. -// Make sure to check that returned schema != nil. +// the initial input, and a panic recovery error if the schema could not be created. func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { + var s *arrow.Schema + defer func(s *arrow.Schema) (*arrow.Schema, error) { + if pErr := recover(); pErr != nil { + return nil, fmt.Errorf("schema problem: %v", pErr) + } + return s, nil + }(s) var fields []arrow.Field for _, c := range u.original.children { fields = append(fields, c.field) } - err := errWrap(u.original) - return arrow.NewSchema(fields, nil), err + s = arrow.NewSchema(fields, nil) + return s, nil } // Schema returns the current merged Arrow schema generated from the structure/types of -// the input(s), and wrapped errors indicating which fields could not be evaluated. -// Make sure to check that returned schema != nil. +// the input(s), and a panic recovery error if the schema could not be created. func (u *Bodkin) Schema() (*arrow.Schema, error) { + var s *arrow.Schema + defer func(s *arrow.Schema) (*arrow.Schema, error) { + if pErr := recover(); pErr != nil { + return nil, fmt.Errorf("schema problem: %v", pErr) + } + return s, nil + }(s) var fields []arrow.Field for _, c := range u.old.children { fields = append(fields, c.field) } - err := errWrap(u.old) - return arrow.NewSchema(fields, nil), err + s = arrow.NewSchema(fields, nil) + return s, nil } // LastSchema returns the Arrow schema generated from the structure/types of // the most recent input. Any uppopulated fields, empty objects or empty slices are skipped. -// ErrNoLatestSchema if Unify() has never been called. -// Make sure to check that returned schema != nil. +// ErrNoLatestSchema if Unify() has never been called. A panic recovery error is returned +// if the schema could not be created. func (u *Bodkin) LastSchema() (*arrow.Schema, error) { + var s *arrow.Schema + defer func(s *arrow.Schema) (*arrow.Schema, error) { + if pErr := recover(); pErr != nil { + return nil, fmt.Errorf("schema problem: %v", pErr) + } + return s, nil + }(s) if u.new == nil { return nil, ErrNoLatestSchema } @@ -219,34 +280,92 @@ func (u *Bodkin) LastSchema() (*arrow.Schema, error) { for _, c := range u.new.children { fields = append(fields, c.field) } - err := errWrap(u.new) - return arrow.NewSchema(fields, nil), err + s = arrow.NewSchema(fields, nil) + return s, nil } // merge merges a new or changed field into the unified schema. // Conflicting TIME, DATE, TIMESTAMP types are upgraded to STRING. // DATE can upgrade to TIMESTAMP. // INTEGER can upgrade to FLOAT. -func (u *Bodkin) merge(n *fieldPos) { - if kin, err := u.old.getPath(n.path); err == ErrPathNotFound { +func (u *Bodkin) merge(n *fieldPos, mergeAt []string) { + var nPath, nParentPath []string + if len(mergeAt) > 0 { + nPath = slices.Concat(mergeAt, n.path) + nParentPath = slices.Concat(mergeAt, n.parent.path) + } else { + nPath = n.path + nParentPath = n.parent.path + } + if kin, err := u.old.getPath(nPath); err == ErrPathNotFound { // root graft if n.root == n.parent { u.old.root.graft(n) } else { // branch graft - b, _ := u.old.getPath(n.parent.path) + b, _ := u.old.getPath(nParentPath) b.graft(n) } } else { if u.typeConversion && (!kin.field.Equal(n.field) && kin.field.Type.ID() != n.field.Type.ID()) { switch kin.field.Type.ID() { - case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64: + case arrow.NULL: + break + case arrow.STRING: + break + case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64: switch n.field.Type.ID() { case arrow.FLOAT16, arrow.FLOAT32, arrow.FLOAT64: err := kin.upgradeType(n, arrow.FLOAT64) if err != nil { kin.err = errors.Join(kin.err, err) } + default: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.FLOAT16: + switch n.field.Type.ID() { + case arrow.FLOAT32: + err := kin.upgradeType(n, arrow.FLOAT32) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + case arrow.FLOAT64: + err := kin.upgradeType(n, arrow.FLOAT64) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + default: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.FLOAT32: + switch n.field.Type.ID() { + case arrow.FLOAT64: + err := kin.upgradeType(n, arrow.FLOAT64) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + default: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.FLOAT64: + switch n.field.Type.ID() { + case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64, arrow.FLOAT16, arrow.FLOAT32: + break + default: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } } case arrow.TIMESTAMP: switch n.field.Type.ID() { @@ -263,7 +382,8 @@ func (u *Bodkin) merge(n *fieldPos) { if err != nil { kin.err = errors.Join(kin.err, err) } - case arrow.TIME64: + // case arrow.TIME64: + default: err := kin.upgradeType(n, arrow.STRING) if err != nil { kin.err = errors.Join(kin.err, err) @@ -280,16 +400,31 @@ func (u *Bodkin) merge(n *fieldPos) { } } for _, v := range n.childmap { - u.merge(v) + u.merge(v, mergeAt) } } } -func (u *Bodkin) knownFieldsSortKeysDesc() []string { - sortedPaths := make([]string, u.knownFields.Len()) - paths := make([]string, u.knownFields.Len()) +func (u *Bodkin) sortMapKeysDesc(k int) []string { + var m *omap.OrderedMap[string, *fieldPos] + var sortedPaths, paths []string + switch k { + case known: + sortedPaths = make([]string, u.knownFields.Len()) + paths = make([]string, u.knownFields.Len()) + m = u.knownFields + case unknown: + sortedPaths = make([]string, u.untypedFields.Len()) + paths = make([]string, u.untypedFields.Len()) + m = u.untypedFields + default: + return sortedPaths + } + if m.Len() == 0 { + return sortedPaths + } i := 0 - for pair := u.knownFields.Newest(); pair != nil; pair = pair.Prev() { + for pair := m.Newest(); pair != nil; pair = pair.Prev() { paths[i] = pair.Key i++ } diff --git a/cmd/main.go b/cmd/main.go index 64a10ea..41ce79b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -42,14 +42,23 @@ func main() { } e, _ := bodkin.NewBodkin(stu, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) sc, err := e.OriginSchema() - fmt.Printf("original input %v\nerrors:\n%v\n", sc.String(), err) + if err != nil { + fmt.Println(err) + } + fmt.Printf("original input %v\n", sc.String()) e.Unify(sch) sc, err = e.OriginSchema() - fmt.Printf("unified %v\nerrors:\n%v\n\n", sc.String(), err) + if err != nil { + fmt.Println(err) + } + fmt.Printf("unified %v\n", sc.String()) u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) s, err := u.OriginSchema() - fmt.Printf("original input %v\nerrors:\n%v\n", s.String(), err) + if err != nil { + fmt.Println(err) + } + fmt.Printf("original input %v\n", s.String()) u.Unify(jsonS2) schema, err := u.Schema() @@ -57,9 +66,17 @@ func main() { fmt.Println(err) } fmt.Printf("changes:\n%v\n", u.Changes()) - fmt.Printf("\nunified %v\nerrors:\n%v\n", schema.String(), err) + fmt.Printf("\nunified %v\n", schema.String()) + var rdr *array.JSONReader - rdr := array.NewJSONReader(strings.NewReader(jsonS2), schema) + u.Unify(jsonS3) + schema, err = u.Schema() + if err != nil { + fmt.Println(err) + } + fmt.Printf("\nsecond unified %v\nerrors:\n%v\n", schema.String(), err) + + rdr = array.NewJSONReader(strings.NewReader(jsonS1), schema) defer rdr.Release() for rdr.Next() { rec := rdr.Record() @@ -67,21 +84,74 @@ func main() { if err != nil { fmt.Printf("error marshaling record: %v\n", err) } - fmt.Printf("\nmarshaled record:\n%v\n", string(rj)) + fmt.Printf("\nmarshaled record jsonS1:\n%v\n", string(rj)) } if err := rdr.Err(); err != nil { fmt.Println(err) } - fmt.Println(u.Changes()) - u.Unify(jsonS3) - schema, err = u.Schema() + rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema) + defer rdr.Release() + for rdr.Next() { + rec := rdr.Record() + rj, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + fmt.Printf("\nmarshaled record jsonS2:\n%v\n", string(rj)) + } + if err := rdr.Err(); err != nil { + fmt.Println(err) + } + rdr = array.NewJSONReader(strings.NewReader(jsonS3), schema) + defer rdr.Release() + for rdr.Next() { + rec := rdr.Record() + rj, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + fmt.Printf("\nmarshaled record jsonS3:\n%v\n", string(rj)) + } + if err := rdr.Err(); err != nil { + fmt.Println(err) + } + + err = u.UnifyAtPath(jsonS4, "$results.results_elem") if err != nil { fmt.Println(err) + } else { + schema, err = u.Schema() + if err != nil { + fmt.Println(err) + } + fmt.Printf("\nAtPath unified %v\n", schema.String()) + } + rdr = array.NewJSONReader(strings.NewReader(jsonS5), schema) + defer rdr.Release() + for rdr.Next() { + rec := rdr.Record() + rj, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + fmt.Printf("\nmarshaled record jsonS5:\n%v\n", string(rj)) + } + if err := rdr.Err(); err != nil { + fmt.Println(err) } - fmt.Printf("\nsecond unified %v\nerrors:\n%v\n", schema.String(), err) - rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema) + err = u.UnifyAtPath(jsonS4, "$results.nonexistant") + if err != nil { + fmt.Println(err) + } else { + schema, err = u.Schema() + if err != nil { + fmt.Println(err) + } + fmt.Printf("\nAtPath unified %v\n", schema.String()) + } + rdr = array.NewJSONReader(strings.NewReader(jsonS7), schema) defer rdr.Release() for rdr.Next() { rec := rdr.Record() @@ -89,12 +159,13 @@ func main() { if err != nil { fmt.Printf("error marshaling record: %v\n", err) } - fmt.Printf("\nmarshaled record:\n%v\n", string(rj)) + fmt.Printf("\nmarshaled record jsonS7, ignoring unknown:\n%v\n", string(rj)) } if err := rdr.Err(); err != nil { fmt.Println(err) } - fmt.Println(u.Changes()) + fmt.Println(u.Paths()) + fmt.Println(u.Err()) } var jsonS1 string = `{ @@ -153,3 +224,33 @@ var jsonS3 string = `{ } ] }` + +var jsonS4 string = `{ + "embed": { + "id": "AAAAA", + "truthy": false + } + }` + +var jsonS5 string = `{ + "results": [ + { + "id": 6328, + "embed": { + "id": "AAAAA" + } + } + ] + }` + +var jsonS7 string = `{ + "xcount": 89.5, + "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", + "previous": "https://sub.domain.com/api/search/?models=thurblig&page=2", + "results": [{"id":7594,"scalar":241.5,"nested":{"strscalar":"str1","nestedarray":[123,456]}}], + "arrayscalar":["str"], + "datetime":"2024-10-24 19:03:09", + "event_time":"2024-10-24T19:03:09+00:00", + "datefield":"2024-10-24T19:03:09+00:00", + "timefield":"1970-01-01" + }` diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go index 5e1e3d8..95e9d1d 100644 --- a/json2parquet/json2parquet.go +++ b/json2parquet/json2parquet.go @@ -14,34 +14,28 @@ import ( ) func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) { - var err, errBundle error + var err error s := bufio.NewScanner(r) var u *bodkin.Bodkin - var i int if s.Scan() { u, err = bodkin.NewBodkin(s.Bytes(), opts...) if err != nil { - errBundle = errors.Join(errBundle, err) + return nil, 0, bodkin.ErrInvalidInput } - i++ } else { - return nil, i, bodkin.ErrInvalidInput + return nil, 0, bodkin.ErrInvalidInput } for s.Scan() { u.Unify(s.Bytes()) - i++ - if i > 10000 { + if u.Count() > 10000 { break } } schema, err := u.Schema() - if schema == nil { - if err != nil { - errBundle = errors.Join(errBundle, err) - } - return nil, i, errBundle + if err != nil { + return nil, u.Count(), err } - return schema, i, errBundle + return schema, u.Count(), err } func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int, error) { diff --git a/option.go b/option.go new file mode 100644 index 0000000..f252c95 --- /dev/null +++ b/option.go @@ -0,0 +1,34 @@ +package bodkin + +// WithInferTimeUnits() enables scanning input string values for time, date and timestamp types. +// +// Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot +// exceed the precision allowed by the time unit, otherwise unmarshalling will error. +// +// Dates use YYYY-MM-DD format. +// +// Timestamps use RFC3339Nano format except without a timezone, all of the following are valid: +// +// YYYY-MM-DD +// YYYY-MM-DD[T]HH +// YYYY-MM-DD[T]HH:MM +// YYYY-MM-DD[T]HH:MM:SS[.zzzzzzzzzz] +func WithInferTimeUnits() Option { + return func(cfg config) { + cfg.inferTimeUnits = true + } +} + +// WithTypeConversion enables upgrading the column types to fix compatibilty conflicts. +func WithTypeConversion() Option { + return func(cfg config) { + cfg.typeConversion = true + } +} + +// WithTypeConversion enables upgrading the column types to fix compatibilty conflicts. +func WithQuotedValuesAreStrings() Option { + return func(cfg config) { + cfg.quotedValuesAreStrings = true + } +} diff --git a/schema.go b/schema.go index f219fda..0f73c89 100644 --- a/schema.go +++ b/schema.go @@ -17,10 +17,7 @@ type fieldPos struct { builder array.Builder name string path []string - isList bool - isItem bool - isStruct bool - isMap bool + arrowType arrow.Type typeName string field arrow.Field children []*fieldPos @@ -46,13 +43,18 @@ var ( // UpgradableTypes are scalar types that can be upgraded to a more flexible type. var UpgradableTypes []arrow.Type = []arrow.Type{arrow.INT8, + arrow.UINT8, arrow.INT16, + arrow.UINT16, arrow.INT32, + arrow.UINT64, arrow.INT64, + arrow.FLOAT16, + arrow.FLOAT32, + arrow.FLOAT64, arrow.DATE32, arrow.TIME64, arrow.TIMESTAMP, - arrow.STRING, } // Regular expressions and variables for type inference. @@ -124,6 +126,7 @@ func (f *fieldPos) newChild(childName string) *fieldPos { } child.path = child.namePath() child.childmap = make(map[string]*fieldPos) + child.arrowType = arrow.NULL return &child } @@ -193,10 +196,12 @@ func (f *fieldPos) getValue(m map[string]any) any { // graft grafts a new field into the schema tree func (f *fieldPos) graft(n *fieldPos) { graft := f.newChild(n.name) + graft.arrowType = n.arrowType graft.field = n.field graft.children = append(graft.children, n.children...) graft.mapChildren() f.assignChild(graft) + f.owner.knownFields.Set(graft.dotPath(), graft) f.owner.untypedFields.Delete(graft.dotPath()) f.owner.changes = errors.Join(f.owner.changes, fmt.Errorf("%w %v : %v", ErrFieldAdded, graft.dotPath(), graft.field.Type.String())) if f.field.Type.ID() == arrow.STRUCT { @@ -211,20 +216,38 @@ func (f *fieldPos) graft(n *fieldPos) { } } -// Only scalar types in UpgradableTypes[] can be upgraded +// Only scalar types in UpgradableTypes[] can be upgraded: +// Supported type upgrades: +// +// arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64 => arrow.FLOAT64 +// arrow.FLOAT16 => arrow.FLOAT32 +// arrow.FLOAT32 => arrow.FLOAT64 +// arrow.FLOAT64 => arrow.STRING +// arrow.TIMESTAMP => arrow.STRING +// arrow.DATE32 => arrow.TIMESTAMP +// arrow.DATE32 => arrow.STRING +// arrow.TIME64 => arrow.STRING func (o *fieldPos) upgradeType(n *fieldPos, t arrow.Type) error { - if !slices.Contains(UpgradableTypes, n.field.Type.ID()) { - return fmt.Errorf("%v %v", n.field.Type.Name(), ErrNotAnUpgradableType.Error()) + if !slices.Contains(UpgradableTypes, o.field.Type.ID()) { + return fmt.Errorf("%s %v %v", n.dotPath(), n.field.Type.Name(), ErrNotAnUpgradableType.Error()) } oldType := o.field.Type.String() + // changes to field switch t { + case arrow.FLOAT32: + o.arrowType = arrow.FLOAT32 + o.field = arrow.Field{Name: o.name, Type: arrow.PrimitiveTypes.Float32, Nullable: true} case arrow.FLOAT64: + o.arrowType = arrow.FLOAT64 o.field = arrow.Field{Name: o.name, Type: arrow.PrimitiveTypes.Float64, Nullable: true} case arrow.STRING: + o.arrowType = arrow.STRING o.field = arrow.Field{Name: o.name, Type: arrow.BinaryTypes.String, Nullable: true} case arrow.TIMESTAMP: + o.arrowType = arrow.TIMESTAMP o.field = arrow.Field{Name: o.name, Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true} } + // changes to parent switch o.parent.field.Type.ID() { case arrow.LIST: o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.ListOf(n.field.Type), Nullable: true} @@ -268,11 +291,12 @@ func mapToArrow(f *fieldPos, m map[string]any) { child.field = arrow.Field{Name: k, Type: arrow.StructOf(fields...), Nullable: true} f.assignChild(child) } else { + child.arrowType = arrow.STRUCT f.owner.untypedFields.Set(child.dotPath(), child) } - case []any: if len(t) <= 0 { + child.arrowType = arrow.LIST f.owner.untypedFields.Set(child.dotPath(), child) f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedArrayElementType, child.namePath())) } else { @@ -281,6 +305,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { f.assignChild(child) } case nil: + child.arrowType = arrow.NULL f.owner.untypedFields.Set(child.dotPath(), child) f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedFieldType, child.namePath())) default: @@ -292,6 +317,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { for _, c := range f.children { fields = append(fields, c.field) } + f.arrowType = arrow.STRUCT f.field = arrow.Field{Name: f.name, Type: arrow.StructOf(fields...), Nullable: true} } @@ -300,7 +326,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { func sliceElemType(f *fieldPos, v []any) arrow.DataType { switch ft := v[0].(type) { case map[string]any: - child := f.newChild(f.name + ".elem") + child := f.newChild(f.name + "_elem") mapToArrow(child, ft) var fields []arrow.Field for _, c := range child.children { @@ -313,7 +339,7 @@ func sliceElemType(f *fieldPos, v []any) arrow.DataType { f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedArrayElementType, f.namePath())) return arrow.GetExtensionType("skip") } - child := f.newChild(f.name + ".elem") + child := f.newChild(f.name + "_elem") et := sliceElemType(child, v[0].([]any)) f.assignChild(child) return arrow.ListOf(et) @@ -322,88 +348,3 @@ func sliceElemType(f *fieldPos, v []any) arrow.DataType { } return nil } - -// goType2Arrow maps a Go type to an Arrow DataType. -func goType2Arrow(f *fieldPos, gt any) arrow.DataType { - var dt arrow.DataType - switch t := gt.(type) { - case []any: - return goType2Arrow(f, t[0]) - // either 32 or 64 bits - case int: - dt = arrow.PrimitiveTypes.Int64 - // the set of all signed 8-bit integers (-128 to 127) - case int8: - dt = arrow.PrimitiveTypes.Int8 - // the set of all signed 16-bit integers (-32768 to 32767) - case int16: - dt = arrow.PrimitiveTypes.Int16 - // the set of all signed 32-bit integers (-2147483648 to 2147483647) - case int32: - dt = arrow.PrimitiveTypes.Int32 - // the set of all signed 64-bit integers (-9223372036854775808 to 9223372036854775807) - case int64: - dt = arrow.PrimitiveTypes.Int64 - // either 32 or 64 bits - case uint: - dt = arrow.PrimitiveTypes.Uint64 - // the set of all unsigned 8-bit integers (0 to 255) - case uint8: - dt = arrow.PrimitiveTypes.Uint8 - // the set of all unsigned 16-bit integers (0 to 65535) - case uint16: - dt = arrow.PrimitiveTypes.Uint16 - // the set of all unsigned 32-bit integers (0 to 4294967295) - case uint32: - dt = arrow.PrimitiveTypes.Uint32 - // the set of all unsigned 64-bit integers (0 to 18446744073709551615) - case uint64: - dt = arrow.PrimitiveTypes.Uint64 - // the set of all IEEE-754 32-bit floating-point numbers - case float32: - dt = arrow.PrimitiveTypes.Float32 - // the set of all IEEE-754 64-bit floating-point numbers - case float64: - dt = arrow.PrimitiveTypes.Float64 - case bool: - dt = arrow.FixedWidthTypes.Boolean - case string: - if f.owner.inferTimeUnits { - for _, r := range timestampMatchers { - if r.MatchString(t) { - return arrow.FixedWidthTypes.Timestamp_us - } - } - if dateMatcher.MatchString(t) { - return arrow.FixedWidthTypes.Date32 - } - if timeMatcher.MatchString(t) { - return arrow.FixedWidthTypes.Time64ns - } - } - if !f.owner.quotedValuesAreStrings { - if slices.Contains(boolMatcher, t) { - return arrow.FixedWidthTypes.Boolean - } - if integerMatcher.MatchString(t) { - return arrow.PrimitiveTypes.Int64 - } - if floatMatcher.MatchString(t) { - return arrow.PrimitiveTypes.Float64 - } - } - dt = arrow.BinaryTypes.String - case []byte: - dt = arrow.BinaryTypes.Binary - // the set of all complex numbers with float32 real and imaginary parts - case complex64: - // TO-DO - // the set of all complex numbers with float64 real and imaginary parts - case complex128: - // TO-DO - case nil: - f.err = fmt.Errorf("%v : %v", ErrUndefinedFieldType, f.namePath()) - dt = arrow.BinaryTypes.Binary - } - return dt -} diff --git a/types.go b/types.go new file mode 100644 index 0000000..dc8d7a4 --- /dev/null +++ b/types.go @@ -0,0 +1,195 @@ +package bodkin + +import ( + "encoding/json" + "fmt" + "slices" + + "github.com/apache/arrow-go/v18/arrow" +) + +// goType2Arrow maps a Go type to an Arrow DataType. +func goType2Arrow(f *fieldPos, gt any) arrow.DataType { + var dt arrow.DataType + switch t := gt.(type) { + case []any: + return goType2Arrow(f, t[0]) + case json.Number: + if _, err := t.Int64(); err == nil { + f.arrowType = arrow.INT64 + dt = arrow.PrimitiveTypes.Int64 + } else { + f.arrowType = arrow.FLOAT64 + dt = arrow.PrimitiveTypes.Float64 + } + // either 32 or 64 bits + case int: + f.arrowType = arrow.INT64 + dt = arrow.PrimitiveTypes.Int64 + // the set of all signed 8-bit integers (-128 to 127) + case int8: + f.arrowType = arrow.INT8 + dt = arrow.PrimitiveTypes.Int8 + // the set of all signed 16-bit integers (-32768 to 32767) + case int16: + f.arrowType = arrow.INT16 + dt = arrow.PrimitiveTypes.Int16 + // the set of all signed 32-bit integers (-2147483648 to 2147483647) + case int32: + f.arrowType = arrow.INT32 + dt = arrow.PrimitiveTypes.Int32 + // the set of all signed 64-bit integers (-9223372036854775808 to 9223372036854775807) + case int64: + f.arrowType = arrow.INT64 + dt = arrow.PrimitiveTypes.Int64 + // either 32 or 64 bits + case uint: + f.arrowType = arrow.UINT64 + dt = arrow.PrimitiveTypes.Uint64 + // the set of all unsigned 8-bit integers (0 to 255) + case uint8: + f.arrowType = arrow.UINT8 + dt = arrow.PrimitiveTypes.Uint8 + // the set of all unsigned 16-bit integers (0 to 65535) + case uint16: + f.arrowType = arrow.UINT16 + dt = arrow.PrimitiveTypes.Uint16 + // the set of all unsigned 32-bit integers (0 to 4294967295) + case uint32: + f.arrowType = arrow.UINT32 + dt = arrow.PrimitiveTypes.Uint32 + // the set of all unsigned 64-bit integers (0 to 18446744073709551615) + case uint64: + f.arrowType = arrow.UINT64 + dt = arrow.PrimitiveTypes.Uint64 + // the set of all IEEE-754 32-bit floating-point numbers + case float32: + f.arrowType = arrow.FLOAT32 + dt = arrow.PrimitiveTypes.Float32 + // the set of all IEEE-754 64-bit floating-point numbers + case float64: + f.arrowType = arrow.FLOAT64 + dt = arrow.PrimitiveTypes.Float64 + case bool: + f.arrowType = arrow.BOOL + dt = arrow.FixedWidthTypes.Boolean + case string: + if f.owner.inferTimeUnits { + for _, r := range timestampMatchers { + if r.MatchString(t) { + f.arrowType = arrow.TIMESTAMP + return arrow.FixedWidthTypes.Timestamp_us + } + } + if dateMatcher.MatchString(t) { + f.arrowType = arrow.DATE32 + return arrow.FixedWidthTypes.Date32 + } + if timeMatcher.MatchString(t) { + f.arrowType = arrow.TIME64 + return arrow.FixedWidthTypes.Time64ns + } + } + if !f.owner.quotedValuesAreStrings { + if slices.Contains(boolMatcher, t) { + f.arrowType = arrow.BOOL + return arrow.FixedWidthTypes.Boolean + } + if integerMatcher.MatchString(t) { + f.arrowType = arrow.INT64 + return arrow.PrimitiveTypes.Int64 + } + if floatMatcher.MatchString(t) { + f.arrowType = arrow.FLOAT64 + return arrow.PrimitiveTypes.Float64 + } + } + f.arrowType = arrow.STRING + dt = arrow.BinaryTypes.String + case []byte: + f.arrowType = arrow.BINARY + dt = arrow.BinaryTypes.Binary + // the set of all complex numbers with float32 real and imaginary parts + case complex64: + // TO-DO + // the set of all complex numbers with float64 real and imaginary parts + case complex128: + // TO-DO + case nil: + f.arrowType = arrow.NULL + f.err = fmt.Errorf("%v : %v", ErrUndefinedFieldType, f.namePath()) + dt = arrow.BinaryTypes.Binary + } + return dt +} + +func arrowTypeID2Type(f *fieldPos, t arrow.Type) arrow.DataType { + var dt arrow.DataType + switch t { + // BOOL is a 1 bit, LSB bit-packed ordering + case arrow.BOOL: + dt = arrow.FixedWidthTypes.Boolean + // the set of all signed 8-bit integers (-128 to 127) + case arrow.INT8: + dt = arrow.PrimitiveTypes.Int8 + // the set of all unsigned 8-bit integers (0 to 255) + case arrow.UINT8: + dt = arrow.PrimitiveTypes.Uint8 + // the set of all signed 16-bit integers (-32768 to 32767) + case arrow.INT16: + dt = arrow.PrimitiveTypes.Int16 + // the set of all unsigned 16-bit integers (0 to 65535) + case arrow.UINT16: + dt = arrow.PrimitiveTypes.Uint16 + // the set of all signed 32-bit integers (-2147483648 to 2147483647) + case arrow.INT32: + dt = arrow.PrimitiveTypes.Int32 + // the set of all unsigned 32-bit integers (0 to 4294967295) + case arrow.UINT32: + dt = arrow.PrimitiveTypes.Uint32 + // the set of all signed 64-bit integers (-9223372036854775808 to 9223372036854775807) + case arrow.INT64: + dt = arrow.PrimitiveTypes.Int64 + // the set of all unsigned 64-bit integers (0 to 18446744073709551615) + case arrow.UINT64: + dt = arrow.PrimitiveTypes.Uint64 + // the set of all IEEE-754 32-bit floating-point numbers + case arrow.FLOAT32: + dt = arrow.PrimitiveTypes.Float32 + // the set of all IEEE-754 64-bit floating-point numbers + case arrow.FLOAT64: + dt = arrow.PrimitiveTypes.Float64 + // TIMESTAMP is an exact timestamp encoded with int64 since UNIX epoch + case arrow.TIMESTAMP: + dt = arrow.FixedWidthTypes.Timestamp_us + // DATE32 is int32 days since the UNIX epoch + case arrow.DATE32: + dt = arrow.FixedWidthTypes.Date32 + // TIME64 is a signed 64-bit integer, representing either microseconds or + // nanoseconds since midnight + case arrow.TIME64: + dt = arrow.FixedWidthTypes.Time64ns + // STRING is a UTF8 variable-length string + case arrow.STRING: + dt = arrow.BinaryTypes.String + // BINARY is a Variable-length byte type (no guarantee of UTF8-ness) + case arrow.BINARY: + dt = arrow.BinaryTypes.Binary + // NULL type having no physical storage + case arrow.NULL: + dt = arrow.BinaryTypes.Binary + case arrow.STRUCT: + var fields []arrow.Field + for _, c := range f.children { + fields = append(fields, c.field) + } + return arrow.StructOf(fields...) + case arrow.LIST: + var fields []arrow.Field + for _, c := range f.children { + fields = append(fields, c.field) + } + return arrow.StructOf(fields...) + } + return dt +}