Skip to content

Commit

Permalink
field tracking maps
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Nov 5, 2024
1 parent 2fd37d2 commit 153cb45
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 20 deletions.
93 changes: 75 additions & 18 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package bodkin
import (
"errors"
"fmt"
"strings"

"github.com/apache/arrow-go/v18/arrow"
"github.com/go-viper/mapstructure/v2"
"github.com/goccy/go-json"
omap "github.com/wk8/go-ordered-map/v2"
)

// Option configures a Bodkin
Expand All @@ -22,6 +24,9 @@ type Bodkin struct {
original *fieldPos
old *fieldPos
new *fieldPos
knownFields *omap.OrderedMap[string, *fieldPos]
untypedFields *omap.OrderedMap[string, *fieldPos]
unificationCount int
inferTimeUnits bool
quotedValuesAreStrings bool
typeConversion bool
Expand All @@ -34,12 +39,46 @@ type Bodkin struct {
// Any uppopulated fields, empty objects or empty slices in JSON or map[string]any inputs are skipped as their
// types cannot be evaluated and converted.
func NewBodkin(a any, opts ...Option) (*Bodkin, error) {
m := map[string]interface{}{}
m, err := InputMap(a)
if err != nil {
return nil, err
}
return newBodkin(m, opts...)
}

func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) {
b := &Bodkin{}
for _, opt := range opts {
opt(b)
}

// Ordered map of known fields, keys are field dotpaths.
b.knownFields = omap.New[string, *fieldPos]()
b.untypedFields = omap.New[string, *fieldPos]()
// Keep an immutable copy of the initial evaluation.
g := newFieldPos(b)
mapToArrow(g, m)
b.original = g

// Identical to above except this one can be mutated with Unify.
f := newFieldPos(b)
mapToArrow(f, m)
b.old = f

return b, errWrap(f)
}

// InputMap takes structured input data and attempts to decode it to
// map[string]any. Input data can be json in string or []byte, or any other
// Go data type which can be decoded by [MapStructure/v2].
// [MapStructure/v2]: github.com/go-viper/mapstructure/v2
func InputMap(a any) (map[string]any, error) {
m := map[string]any{}
switch input := a.(type) {
case nil:
return nil, ErrUndefinedInput
case map[string]any:
return newBodkin(input, opts...)
return input, nil
case []byte:
err := json.Unmarshal(input, &m)
if err != nil {
Expand All @@ -56,22 +95,7 @@ func NewBodkin(a any, opts ...Option) (*Bodkin, error) {
return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err)
}
}
return newBodkin(m, opts...)
}

func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) {
b := &Bodkin{}
f := newFieldPos(b)
for _, opt := range opts {
opt(b)
}
mapToArrow(f, m)
b.old = f

g := newFieldPos(b)
mapToArrow(g, m)
b.original = g
return b, errWrap(f)
return m, nil
}

// Err returns the last errors encountered during the unification of input schemas.
Expand All @@ -81,6 +105,9 @@ func (u *Bodkin) Err() error { return u.err }
// in the lifetime of the Bodkin object.
func (u *Bodkin) Changes() error { return u.changes }

// Count returns the number of datum evaluated for schema to date.
func (u *Bodkin) Count() int { return u.unificationCount }

// WithInferTimeUnits() enables scanning input string values for time, date and timestamp types.
//
// Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot
Expand Down Expand Up @@ -153,6 +180,7 @@ func (u *Bodkin) Unify(a any) {
for _, field := range u.new.children {
u.merge(field)
}
u.unificationCount++
}

// Schema returns the original Arrow schema generated from the structure/types of
Expand Down Expand Up @@ -256,3 +284,32 @@ func (u *Bodkin) merge(n *fieldPos) {
}
}
}

func (u *Bodkin) knownFieldsSortKeysDesc() []string {
sortedPaths := make([]string, u.knownFields.Len())
paths := make([]string, u.knownFields.Len())
i := 0
for pair := u.knownFields.Newest(); pair != nil; pair = pair.Prev() {
paths[i] = pair.Key
i++
}
maxDepth := 0
for _, p := range paths {
pathDepth := strings.Count(p, ".")
if pathDepth > maxDepth {
maxDepth = pathDepth
}
}
sortIndex := 0
for maxDepth >= 0 {
for _, p := range paths {
pathDepth := strings.Count(p, ".")
if pathDepth == maxDepth {
sortedPaths[sortIndex] = p
sortIndex++
}
}
maxDepth--
}
return sortedPaths
}
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func main() {
}
fmt.Printf("\nsecond unified %v\nerrors:\n%v\n", schema.String(), err)

rdr = array.NewJSONReader(strings.NewReader(jsonS3), schema)
rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema)
defer rdr.Release()
for rdr.Next() {
rec := rdr.Record()
Expand Down
17 changes: 16 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ require (
github.com/apache/arrow-go/v18 v18.0.0
github.com/go-viper/mapstructure/v2 v2.2.1
github.com/goccy/go-json v0.10.3
github.com/hamba/avro/v2 v2.26.0
github.com/redpanda-data/benthos/v4 v4.40.0
github.com/tidwall/sjson v1.2.5
github.com/wk8/go-ordered-map/v2 v2.1.8
)

require (
Expand All @@ -14,20 +18,31 @@ require (
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matoous/go-nanoid/v2 v2.1.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/redpanda-data/benthos/v4 v4.40.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/tidwall/gjson v1.14.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tilinna/z85 v1.0.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
Expand Down
Loading

0 comments on commit 153cb45

Please sign in to comment.