diff --git a/.gitignore b/.gitignore index 72980f9..9b3b29a 100644 --- a/.gitignore +++ b/.gitignore @@ -24,8 +24,10 @@ go.work.sum # env file .env +internal avro experiments map.go *.schema -*.pgo \ No newline at end of file +*.pgo +debug \ No newline at end of file diff --git a/README.md b/README.md index da58363..05c0926 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Bodkin enables you to use your _data_ to define and evolve your Arrow Schema. - Converts schema field types when unifying schemas to accept evolving input data - Tracks changes to the schema - Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition +- Custom data loader to load structured data directly to Arrow Records based on inferred schema ## 🚀 Install @@ -33,7 +34,7 @@ You can import `bodkin` using: import "github.com/loicalleyne/bodkin" ``` -Create a new Bodkin, providing some structured data and print out the resulting Arrow Schema's string representation and field evaluation errors +Create a new Bodkin, provide some structured data and print out the resulting Arrow Schema's string representation and any field evaluation errors ```go var jsonS1 string = `{ "count": 89, @@ -44,7 +45,8 @@ var jsonS1 string = `{ "datefield":"1979-01-01", "timefield":"01:02:03" }` -u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) +u, _ := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) +u.Unify(jsonS1) s, _ := u.OriginSchema() fmt.Printf("original input %v\n", s.String()) for _, e := range u.Err() { @@ -100,7 +102,7 @@ fmt.Println(u.Changes()) // changed $timefield : from time64[ns] to utf8 ``` -Also works with Go structs +Also works with nested Go structs and slices ```go stu := Student{ Name: "StudentName", @@ -136,6 +138,13 @@ Also works with Go structs // - Age: type=int32, nullable ``` +Use a Bodkin Reader to load data to Arrow Records +```go +u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) +r, _ := u.NewReader() +rec, _ := r.ReadToRecord([]byte(jsonS1)) +``` + Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records ```go rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema) @@ -164,6 +173,7 @@ fmt.Printf("imported %v\n", sc.String()) ## 💫 Show your support Give a ⭐️ if this project helped you! +Feedback and PRs welcome. ## License diff --git a/bodkin.go b/bodkin.go index 7f999ee..ea8c217 100644 --- a/bodkin.go +++ b/bodkin.go @@ -4,8 +4,10 @@ package bodkin import ( + "bufio" "errors" "fmt" + "io" "math" "os" "slices" @@ -24,6 +26,11 @@ type ( config *Bodkin ) +type ( + ReaderOption func(reader.Option) + readerConfig *reader.DataReader +) + // Field represents an element in the input data. type Field struct { Dotpath string `json:"dotpath"` @@ -41,13 +48,17 @@ const ( // Bodkin is a collection of field paths, describing the columns of a structured input(s). type Bodkin struct { + rr io.Reader + sf bufio.SplitFunc + sc *bufio.Scanner original *fieldPos old *fieldPos new *fieldPos + r *reader.DataReader knownFields *omap.OrderedMap[string, *fieldPos] untypedFields *omap.OrderedMap[string, *fieldPos] - unificationCount int64 - maxCount int64 + unificationCount int + maxCount int inferTimeUnits bool quotedValuesAreStrings bool typeConversion bool @@ -55,19 +66,30 @@ type Bodkin struct { changes error } +func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error) { + schema, err := u.Schema() + if err != nil { + return nil, err + } + if schema == nil { + return nil, fmt.Errorf("nil schema") + } + r, err := reader.NewReader(schema, 0, opts...) + if err != nil { + return nil, err + } + return r, nil +} + // NewBodkin returns a new Bodkin value from a structured input. // Input must be a json byte slice or string, a Go struct with exported fields or map[string]any. // Any uppopulated fields, empty objects or empty slices in JSON or map[string]any inputs are skipped as their // types cannot be evaluated and converted. -func NewBodkin(a any, opts ...Option) (*Bodkin, error) { - m, err := reader.InputMap(a) - if err != nil { - return nil, err - } - return newBodkin(m, opts...) +func NewBodkin(opts ...Option) *Bodkin { + return newBodkin(opts...) } -func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { +func newBodkin(opts ...Option) *Bodkin { b := &Bodkin{} for _, opt := range opts { opt(b) @@ -76,17 +98,8 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { // Ordered map of known fields, keys are field dotpaths. b.knownFields = omap.New[string, *fieldPos]() b.untypedFields = omap.New[string, *fieldPos]() - // Keep an immutable copy of the initial evaluation. - g := newFieldPos(b) - mapToArrow(g, m) - b.original = g - _, err := b.OriginSchema() - // Identical to above except this one can be mutated with Unify. - f := newFieldPos(b) - mapToArrow(f, m) - b.old = f - b.maxCount = int64(math.MaxInt64) - return b, err + b.maxCount = math.MaxInt + return b } // Returns count of evaluated field paths. @@ -124,13 +137,13 @@ func (u *Bodkin) Err() []Field { func (u *Bodkin) Changes() error { return u.changes } // Count returns the number of datum evaluated for schema to date. -func (u *Bodkin) Count() int64 { return u.unificationCount } +func (u *Bodkin) Count() int { return u.unificationCount } // MaxCount returns the maximum number of datum to be evaluated for schema. -func (u *Bodkin) MaxCount() int64 { return u.unificationCount } +func (u *Bodkin) MaxCount() int { return u.unificationCount } // ResetCount resets the count of datum evaluated for schema to date. -func (u *Bodkin) ResetCount() int64 { +func (u *Bodkin) ResetCount() int { u.unificationCount = 0 return u.unificationCount } @@ -138,8 +151,8 @@ func (u *Bodkin) ResetCount() int64 { // ResetMaxCount resets the maximum number of datam to be evaluated for schema // to maxInt64. // ResetCount resets the count of datum evaluated for schema to date. -func (u *Bodkin) ResetMaxCount() int64 { - u.maxCount = int64(math.MaxInt64) +func (u *Bodkin) ResetMaxCount() int { + u.maxCount = math.MaxInt return u.unificationCount } @@ -210,7 +223,17 @@ func (u *Bodkin) Unify(a any) error { u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) return fmt.Errorf("%v : %v", ErrInvalidInput, err) } - + if u.old == nil { + // Keep an immutable copy of the initial evaluation. + g := newFieldPos(u) + mapToArrow(g, m) + u.original = g + // Identical to above except this one can be mutated with Unify. + f := newFieldPos(u) + mapToArrow(f, m) + u.old = f + return nil + } f := newFieldPos(u) mapToArrow(f, m) u.new = f @@ -221,11 +244,45 @@ func (u *Bodkin) Unify(a any) error { return nil } +// UnifyScan reads from a provided io.Reader and merges each datum's structured input's column definition +// with the previously input's schema. Any uppopulated fields, empty objects or empty slices +// in JSON input are skipped. +func (u *Bodkin) UnifyScan() error { + var err error + if u.rr == nil { + return fmt.Errorf("no io.reader provided") + } + if u.unificationCount > u.maxCount { + return fmt.Errorf("maxcount exceeded") + } + defer func() error { + if rc := recover(); rc != nil { + u.err = errors.Join(u.err, err, fmt.Errorf("panic %v", rc)) + } + return u.err + }() + for u.sc.Scan() { + m, err := reader.InputMap(u.sc.Bytes()) + if err != nil { + u.err = errors.Join(u.err, err) + continue + } + u.Unify(m) + } + if err := u.sc.Err(); err != nil { + u.err = errors.Join(u.err, err) + } + return u.err +} + // Unify merges structured input's column definition with the previously input's schema, // using a specified valid path as the root. An error is returned if the mergeAt path is // not found. // Any uppopulated fields, empty objects or empty slices in JSON input are skipped. func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error { + if u.old == nil { + return fmt.Errorf("bodkin not initialised") + } if u.unificationCount > u.maxCount { return fmt.Errorf("maxcount exceeded") } @@ -256,6 +313,9 @@ func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error { // Schema returns the original Arrow schema generated from the structure/types of // the initial input, and a panic recovery error if the schema could not be created. func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { + if u.old == nil { + return nil, fmt.Errorf("bodkin not initialised") + } var s *arrow.Schema defer func(s *arrow.Schema) (*arrow.Schema, error) { if pErr := recover(); pErr != nil { @@ -274,6 +334,9 @@ func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { // Schema returns the current merged Arrow schema generated from the structure/types of // the input(s), and a panic recovery error if the schema could not be created. func (u *Bodkin) Schema() (*arrow.Schema, error) { + if u.old == nil { + return nil, fmt.Errorf("bodkin not initialised") + } var s *arrow.Schema defer func(s *arrow.Schema) (*arrow.Schema, error) { if pErr := recover(); pErr != nil { @@ -294,6 +357,9 @@ func (u *Bodkin) Schema() (*arrow.Schema, error) { // ErrNoLatestSchema if Unify() has never been called. A panic recovery error is returned // if the schema could not be created. func (u *Bodkin) LastSchema() (*arrow.Schema, error) { + if u.new == nil { + return nil, ErrNoLatestSchema + } var s *arrow.Schema defer func(s *arrow.Schema) (*arrow.Schema, error) { if pErr := recover(); pErr != nil { @@ -301,9 +367,6 @@ func (u *Bodkin) LastSchema() (*arrow.Schema, error) { } return s, nil }(s) - if u.new == nil { - return nil, ErrNoLatestSchema - } var fields []arrow.Field for _, c := range u.new.children { fields = append(fields, c.field) diff --git a/cmd/.gitignore b/cmd/.gitignore index 94a2dd1..ca83256 100644 --- a/cmd/.gitignore +++ b/cmd/.gitignore @@ -1 +1,7 @@ -*.json \ No newline at end of file +*.json +<<<<<<< Updated upstream +*.bak +main?.go +======= +*.bak +>>>>>>> Stashed changes diff --git a/cmd/main.go b/cmd/main.go index fa8b273..79b8bc2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,283 +1,109 @@ package main import ( + "bufio" "fmt" "log" - "strings" + "os" + "time" - "github.com/apache/arrow-go/v18/arrow/array" "github.com/loicalleyne/bodkin" ) -type AddressType struct { - Street string - City string - Region string - Country string -} -type School struct { - Name string - Address AddressType -} - -type Student struct { - Name string - Age int32 - ID int64 - Day int32 - School - Addresses []AddressType -} - func main() { - stu := Student{ - Name: "StudentName", - Age: 25, - ID: 123456, - Day: 123, - Addresses: []AddressType{{Country: "Azerbijjan"}, {Country: "Zimbabwe"}}, - } - sch := School{ - Name: "SchoolName", - Address: AddressType{ - Country: "CountryName", - }, - } - e, _ := bodkin.NewBodkin(stu, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) - sc, err := e.OriginSchema() + start := time.Now() + filepath := "github.json" + log.Println("start") + f, err := os.Open(filepath) if err != nil { - fmt.Println(err) + panic(err) } - fmt.Printf("original input %v\n", sc.String()) - e.Unify(sch) - sc, err = e.OriginSchema() + defer f.Close() + s := bufio.NewScanner(f) + u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) if err != nil { - fmt.Println(err) + panic(err) } - fmt.Printf("unified %v\n", sc.String()) - u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) - s, err := u.OriginSchema() - if err != nil { - fmt.Println(err) + for s.Scan() { + err = u.Unify(s.Bytes()) + if err != nil { + panic(err) + } } - fmt.Printf("original input %v\n", s.String()) - - u.Unify(jsonS2) + f.Close() schema, err := u.Schema() if err != nil { - fmt.Println(err) + panic(err) } - fmt.Printf("changes:\n%v\n", u.Changes()) - fmt.Printf("\nunified %v\n", schema.String()) - var rdr *array.JSONReader + log.Printf("union %v\n", schema.String()) + log.Printf("elapsed: %v\n", time.Since(start)) - u.Unify(jsonS3) - schema, err = u.Schema() + ff, err := os.Open(filepath) if err != nil { - fmt.Println(err) - } - fmt.Printf("\nsecond unified %v\nerrors:\n%v\n", schema.String(), err) - - rdr = array.NewJSONReader(strings.NewReader(jsonS1), schema) - defer rdr.Release() - for rdr.Next() { - rec := rdr.Record() - rj, err := rec.MarshalJSON() - if err != nil { - fmt.Printf("error marshaling record: %v\n", err) - } - fmt.Printf("\nmarshaled record jsonS1:\n%v\n", string(rj)) + panic(err) } - if err := rdr.Err(); err != nil { - fmt.Println(err) - } - - rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema) - defer rdr.Release() - for rdr.Next() { - rec := rdr.Record() - rj, err := rec.MarshalJSON() - if err != nil { - fmt.Printf("error marshaling record: %v\n", err) - } - fmt.Printf("\nmarshaled record jsonS2:\n%v\n", string(rj)) - } - if err := rdr.Err(); err != nil { - fmt.Println(err) - } - rdr = array.NewJSONReader(strings.NewReader(jsonS3), schema) - defer rdr.Release() - for rdr.Next() { - rec := rdr.Record() - rj, err := rec.MarshalJSON() - if err != nil { - fmt.Printf("error marshaling record: %v\n", err) - } - fmt.Printf("\nmarshaled record jsonS3:\n%v\n", string(rj)) - } - if err := rdr.Err(); err != nil { - fmt.Println(err) - } - - err = u.UnifyAtPath(jsonS4, "$results.results_elem") + defer ff.Close() + r, err := u.NewReader() if err != nil { - fmt.Println(err) - } else { - schema, err = u.Schema() - if err != nil { - fmt.Println(err) - } - fmt.Printf("\nAtPath unified %v\n", schema.String()) + panic(err) } - rdr = array.NewJSONReader(strings.NewReader(jsonS5), schema) - defer rdr.Release() - for rdr.Next() { - rec := rdr.Record() - rj, err := rec.MarshalJSON() + i := 0 + s = bufio.NewScanner(ff) + for s.Scan() { + rec, err := r.ReadToRecord(s.Bytes()) if err != nil { - fmt.Printf("error marshaling record: %v\n", err) - } - fmt.Printf("\nmarshaled record jsonS5:\n%v\n", string(rj)) - } - if err := rdr.Err(); err != nil { - fmt.Println(err) - } - - err = u.UnifyAtPath(jsonS4, "$results.nonexistant") - if err != nil { - fmt.Println(err) - } else { - schema, err = u.Schema() - if err != nil { - fmt.Println(err) + panic(err) } - fmt.Printf("\nAtPath unified %v\n", schema.String()) - } - rdr = array.NewJSONReader(strings.NewReader(jsonS7), schema) - defer rdr.Release() - for rdr.Next() { - rec := rdr.Record() - rj, err := rec.MarshalJSON() + _, err = rec.MarshalJSON() if err != nil { fmt.Printf("error marshaling record: %v\n", err) } - fmt.Printf("\nmarshaled record jsonS7, ignoring unknown:\n%v\n", string(rj)) - } - if err := rdr.Err(); err != nil { - fmt.Println(err) - } - fmt.Println(u.Paths()) - for _, e := range u.Err() { - fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath) - } - fmt.Println(u.Changes()) - bs, err := u.ExportSchemaBytes() - if err != nil { - fmt.Println(err) - } else { - imp, err := u.ImportSchemaBytes(bs) - if err != nil { - fmt.Println(err) - } else { - fmt.Printf("imported %v\n", imp.String()) - } - } - err = u.ExportSchemaFile("./temp.schema") - if err != nil { - log.Fatal(err) - } - sb, err := u.ImportSchemaFile("./temp.schema") - if err != nil { - fmt.Println(err) - } else { - fmt.Println("deserialized:\n", sb.String()) + // fmt.Printf("\nmarshaled record :\n%v\n", string(rj)) + i++ } -} -var jsonS1 string = `{ - "count": 89, - "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", - "previous": null, - "results": [{"id":7594}], - "arrayscalar":[], - "datefield":"1979-01-01", - "timefield":"01:02:03", - "boolquotedfield":"true" - }` + log.Println("records", i) + log.Printf("elapsed: %v\n", time.Since(start)) + log.Println("end") +} -var jsonS2 string = `{ - "count": 89.5, - "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", - "previous": "https://sub.domain.com/api/search/?models=thurblig&page=2", - "results": [{"id":7594,"scalar":241.5,"nested":{"strscalar":"str1","nestedarray":[123,456]}}], - "arrayscalar":["str"], - "datetime":"2024-10-24 19:03:09", - "event_time":"2024-10-24T19:03:09+00:00", - "datefield":"2024-10-24T19:03:09+00:00", - "timefield":"1970-01-01" - }` +var jsonS1 string = `{"location_types":[{"enumeration_id":"702","id":81,"name":"location81"}],"misc_id":"123456789987a"}` var jsonS3 string = `{ - "count": 85, - "next": "https://sub.domain.com/api/search/?models=thurblig", - "previous": null, - "results": [ - { - "id": 6328, - "name": "New user SMB check 2310-1", - "external_id": null, - "title": "New user SMB check 2310-1", - "content_type": "new agent", - "model": "Agent", - "emptyobj":{}, - "data": { - "id": 6328, - "nestednullscalar": null, - "dsp": { - "id": 116, - "name": "El Thingy Bueno", - "nullarray":[] - }, - "name": "New user SMB check 2310-1", - "agency": { - "id": 925, - "name": "New user SMB check 2310-1" - }, - "export_status": { - "status": true - } - } - } - ] - }` - -var jsonS4 string = `{ - "embed": { - "id": "AAAAA", - "truthy": false - } - }` - -var jsonS5 string = `{ + "count": 85, + "next": "https://sub.domain.com/api/search/?models=thurblig", + "previous": null, "results": [ { "id": 6328, - "embed": { - "id": "AAAAA" + "name": "New user SMB check 2310-1", + "external_id": null, + "title": "New user SMB check 2310-1", + "content_type": "new agent", + "model": "Agent", + "emptyobj":{}, + "dataobj": { + "id": 6328, + "nestednullscalar": null, + "dsp": { + "id": 116, + "name": "El Thingy Bueno", + "nullarray":[] + }, + "name": "New user SMB check 2310-1", + "agency":{ + "id": 925, + "name": "New user SMB check 2310-1", + "employees":[{"id":99,"name":"abcd"},{"id":87,"name":"smart"}] + }, + "export_status": { + "status": true + } } } ] }` -var jsonS7 string = `{ - "xcount": 89.5, - "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", - "previous": "https://sub.domain.com/api/search/?models=thurblig&page=2", - "results": [{"id":7594,"scalar":241.5,"nested":{"strscalar":"str1","nestedarray":[123,456]}}], - "arrayscalar":["str"], - "datetime":"2024-10-24 19:03:09", - "event_time":"2024-10-24T19:03:09+00:00", - "datefield":"2024-10-24T19:03:09+00:00", - "timefield":"1970-01-01" - }` +var jsonS2 string = `{"address":"11540 Foo Ave.","allowed_ad_types":[{"id":1,"name":"static"},{"id":2,"name":"video"},{"id":3,"name":"audio"},{"id":4,"name":"HTML"}],"allows_motion":true,"aspect_ratio":{"horizontal":16,"id":5,"name":"16:9","vertical":9},"audience_data_sources":[{"id":3,"name":"GeoPath"},{"id":4,"name":"1st party data"},{"id":7,"name":"Dutch outdoor research"},{"id":10,"name":"COMMB"}],"average_imp_multiplier":21,"average_weekly_impressions":123,"bearing":100,"bearing_direction":"E","bid_floors":[{"currency":{"code":"USD","id":1,"name":"US Dollars","symbol":"$"},"floor":10},{"currency":{"code":"CAD","id":9,"name":"Canadian dollar","symbol":"$"},"floor":0.01},{"currency":{"code":"AUD","id":8,"name":"Australian dollar","symbol":"$"},"floor":0.01}],"connectivity":1,"demography_type":"basic","device_id":"1234.broadsign.com","diagonal_size":88,"diagonal_size_units":"inches","dma":{"code":662,"id":5,"name":"Abilene-Sweetwater, TX"},"export_status":{"status":true},"geo":{"city":{"id":344757,"name":"Acme"},"country":{"id":40,"name":"Canada"},"region":{"id":485,"name":"Alberta"}},"hivestack_id":"abcd1234efgh","id":1,"internal_publisher_screen_id":"1q2w3e","is_active":true,"is_audio":false,"latitude":45.5017,"longitude":73.5673,"max_ad_duration":90,"min_ad_duration":5,"most_recent":1,"name":"Office test screen (Jody) - DO NOT DELETE","ox_enabled":false,"publisher":{"additional_currencies":[{"code":"CAD","id":9,"name":"Canadian dollar","symbol":"$"},{"code":"AUD","id":8,"name":"Australian dollar","symbol":"$"}],"currency":{"code":"USD","id":1,"name":"US Dollars","symbol":"$"},"id":1,"is_hivestack_bidder":true,"is_multi_currency_enabled":true,"is_px_bidder":true,"is_vistar_bidder":true,"name":"Publisher Demo"},"resolution":{"height":1080,"id":835,"name":"1920x1080","orientation":"landscape","title":"1920x1080","width":1920},"screen_count":1,"screen_img_url":"https://www.youtube.com/watch?v=8v7KJoGDGwI","screen_type":{"id":105,"name":"LED"},"tags":[{"id":6656,"name":"test"}],"time_zone":{"id":306,"name":"America/Edmonton"},"timestamp":"2024-11-01 05:20:06.642057","total":0,"transact_status":"ok","transact_status_ox":"ok","venue_types":[{"enumeration_id":"602","id":81,"name":"education.colleges"}],"vistar_id":"123456789987a"} +` diff --git a/debug/assert_off.go b/debug/assert_off.go deleted file mode 100644 index 7236066..0000000 --- a/debug/assert_off.go +++ /dev/null @@ -1,9 +0,0 @@ -//go:build !assert -// +build !assert - -package debug - -// Assert will panic with msg if cond is false. -// -// msg must be a string, func() string or fmt.Stringer. -func Assert(cond bool, msg interface{}) {} diff --git a/debug/assert_on.go b/debug/assert_on.go deleted file mode 100644 index 164ce3b..0000000 --- a/debug/assert_on.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build assert -// +build assert - -package debug - -// Assert will panic with msg if cond is false. -// -// msg must be a string, func() string or fmt.Stringer. -func Assert(cond bool, msg interface{}) { - if !cond { - panic(getStringValue(msg)) - } -} diff --git a/debug/doc.go b/debug/doc.go deleted file mode 100644 index 9ff7166..0000000 --- a/debug/doc.go +++ /dev/null @@ -1,9 +0,0 @@ -/* -Package debug provides APIs for conditional runtime assertions and debug logging. - -# Using Assert - -To enable runtime assertions, build with the assert tag. When the assert tag is omitted, -the code for the assertion will be omitted from the binary. -*/ -package debug diff --git a/debug/util.go b/debug/util.go deleted file mode 100644 index 5baf29d..0000000 --- a/debug/util.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:build debug || assert -// +build debug assert - -package debug - -import "fmt" - -func getStringValue(v interface{}) string { - switch a := v.(type) { - case func() string: - return a() - - case string: - return a - - case fmt.Stringer: - return a.String() - - default: - panic(fmt.Sprintf("unexpected type, %t", v)) - } -} diff --git a/json2parquet/cmd/main.go b/json2parquet/cmd/main.go index e047734..ccd5775 100644 --- a/json2parquet/cmd/main.go +++ b/json2parquet/cmd/main.go @@ -7,13 +7,10 @@ import ( "os" "runtime/pprof" - "github.com/redpanda-data/benthos/v4/public/bloblang" - "github.com/loicalleyne/bodkin" j2p "github.com/loicalleyne/bodkin/json2parquet" ) -var exe *bloblang.Executor var cpuprofile = flag.String("cpuprofile", "default.pgo", "write cpu profile to `file`") func main() { @@ -23,7 +20,7 @@ func main() { inputFile := flag.String("in", "t.json", "input file") outputFile := flag.String("out", "screens.parquet", "output file") dryRun := flag.Bool("n", false, "only print the schema") - lines := flag.Int64("lines", 0, "number of lines from which to infer schema; 0 means whole file is scanned") + lines := flag.Int("lines", 0, "number of lines from which to infer schema; 0 means whole file is scanned") flag.Parse() if *inputFile == "" { log.Fatal("no input file specified") diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go index b94976b..dd82900 100644 --- a/json2parquet/json2parquet.go +++ b/json2parquet/json2parquet.go @@ -14,18 +14,10 @@ import ( "github.com/loicalleyne/bodkin/pq" ) -func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int64, error) { +func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) { var err error s := bufio.NewScanner(r) - var u *bodkin.Bodkin - if s.Scan() { - u, err = bodkin.NewBodkin(s.Bytes(), opts...) - if err != nil { - return nil, 0, bodkin.ErrInvalidInput - } - } else { - return nil, 0, bodkin.ErrInvalidInput - } + u := bodkin.NewBodkin(opts...) for s.Scan() { u.Unify(s.Bytes()) if u.Count() > u.MaxCount() { @@ -39,7 +31,7 @@ func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int64, error return schema, u.Count(), err } -func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int64, error) { +func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int, error) { f, err := os.Open(inputFile) if err != nil { return nil, 0, err diff --git a/option.go b/option.go index e4137e5..3e7c91a 100644 --- a/option.go +++ b/option.go @@ -1,5 +1,10 @@ package bodkin +import ( + "bufio" + "io" +) + // WithInferTimeUnits() enables scanning input string values for time, date and timestamp types. // // Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot @@ -34,8 +39,20 @@ func WithQuotedValuesAreStrings() Option { } // WithMaxCount enables capping the number of Unify evaluations. -func WithMaxCount(i int64) Option { +func WithMaxCount(i int) Option { return func(cfg config) { cfg.maxCount = i } } + +// WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan(). +// A bufio.SplitFunc can optionally be provided, otherwise the default +// ScanLines will be used. +func WithIOReader(r io.Reader, sf bufio.SplitFunc) Option { + return func(cfg config) { + cfg.rr = r + if sf != nil { + cfg.sf = sf + } + } +} diff --git a/reader/.gitignore b/reader/.gitignore index 997e146..6498c13 100644 --- a/reader/.gitignore +++ b/reader/.gitignore @@ -1 +1,2 @@ -loader.go \ No newline at end of file +avro.go +recordfactory.go \ No newline at end of file diff --git a/reader/input.go b/reader/input.go index bbf7206..51fc409 100644 --- a/reader/input.go +++ b/reader/input.go @@ -1,4 +1,3 @@ -// Package reader contains helpers for reading data and loading to Arrow. package reader import ( diff --git a/reader/loader.go b/reader/loader.go new file mode 100644 index 0000000..bf94c90 --- /dev/null +++ b/reader/loader.go @@ -0,0 +1,998 @@ +package reader + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math/big" + "strconv" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/decimal256" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +type dataLoader struct { + idx, depth int32 + list *fieldPos + item *fieldPos + mapField *fieldPos + mapKey *fieldPos + mapValue *fieldPos + fields []*fieldPos + children []*dataLoader +} + +var ( + ErrNullStructData = errors.New("null struct data") +) + +func newDataLoader() *dataLoader { return &dataLoader{idx: 0, depth: 0} } + +// drawTree takes the tree of field builders produced by mapFieldBuilders() +// and produces another tree structure and aggregates fields whose values can +// be retrieved from a `map[string]any` into a slice of builders, and creates a hierarchy to +// deal with nested types (lists and maps). +func (d *dataLoader) drawTree(field *fieldPos) { + for _, f := range field.children() { + if f.isList || f.isMap { + if f.isList { + c := d.newListChild(f) + if !f.childrens[0].isList { + c.item = f.childrens[0] + c.drawTree(f.childrens[0]) + } else { + c.drawTree(f.childrens[0].childrens[0]) + } + } + if f.isMap { + c := d.newMapChild(f) + if !arrow.IsNested(f.childrens[1].builder.Type().ID()) { + c.mapKey = f.childrens[0] + c.mapValue = f.childrens[1] + } else { + c.mapKey = f.childrens[0] + m := c.newChild() + m.mapValue = f.childrens[1] + m.drawTree(f.childrens[1]) + } + } + } else { + d.fields = append(d.fields, f) + if len(f.children()) > 0 { + d.drawTree(f) + } + } + } +} + +// loadDatum loads data to the schema fields' builder functions. +// Since array.StructBuilder.AppendNull() will recursively append null to all of the +// struct's fields, in the case of nil being passed to a struct's builderFunc it will +// return a ErrNullStructData error to signal that all its sub-fields can be skipped. +func (d *dataLoader) loadDatum(data any) error { + if d.list == nil && d.mapField == nil { + if d.mapValue != nil { + d.mapValue.appendFunc(data) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + if d.mapValue == nil { + err := f.appendFunc(f.getValue(data)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } else { + switch dt := data.(type) { + case nil: + err := f.appendFunc(dt) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + case []any: + if len(d.children) < 1 { + for _, e := range dt { + err := f.appendFunc(e) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + } else { + for _, e := range dt { + d.children[0].loadDatum(e) + } + } + case map[string]any: + err := f.appendFunc(f.getValue(dt)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + + } + } + for _, c := range d.children { + if c.list != nil { + c.loadDatum(c.list.getValue(data)) + } + if c.mapField != nil { + switch dt := data.(type) { + case nil: + c.loadDatum(dt) + case map[string]any: + c.loadDatum(c.mapField.getValue(dt)) + default: + c.loadDatum(c.mapField.getValue(data)) + } + } + } + } else { + if d.list != nil { + switch dt := data.(type) { + case nil: + d.list.appendFunc(dt) + case []any: + d.list.appendFunc(dt) + for _, e := range dt { + if d.item != nil { + d.item.appendFunc(e) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + err := f.appendFunc(f.getValue(e)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + for _, c := range d.children { + if c.list != nil { + c.loadDatum(c.list.getValue(e)) + } + if c.mapField != nil { + c.loadDatum(c.mapField.getValue(e)) + } + } + } + case map[string]any: + d.list.appendFunc(dt) // + for _, e := range dt { // + if d.item != nil { + d.item.appendFunc(e) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + err := f.appendFunc(f.getValue(e)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + for _, c := range d.children { + c.loadDatum(c.list.getValue(e)) + } + } + default: + d.list.appendFunc(data) + d.item.appendFunc(dt) + } + } + if d.mapField != nil { + switch dt := data.(type) { + case nil: + d.mapField.appendFunc(dt) + case map[string]any: + d.mapField.appendFunc(dt) + for k, v := range dt { + d.mapKey.appendFunc(k) + if d.mapValue != nil { + d.mapValue.appendFunc(v) + } else { + d.children[0].loadDatum(v) + } + } + } + } + } + return nil +} + +func (d *dataLoader) newChild() *dataLoader { + var child *dataLoader = &dataLoader{ + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +func (d *dataLoader) newListChild(list *fieldPos) *dataLoader { + var child *dataLoader = &dataLoader{ + list: list, + item: list.childrens[0], + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +func (d *dataLoader) newMapChild(mapField *fieldPos) *dataLoader { + var child *dataLoader = &dataLoader{ + mapField: mapField, + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +type fieldPos struct { + parent *fieldPos + fieldName string + builder array.Builder + source DataSource + path []string + isList bool + isItem bool + isStruct bool + isMap bool + typeName string + appendFunc func(val interface{}) error + metadatas arrow.Metadata + childrens []*fieldPos + index, depth int32 +} + +func newFieldPos() *fieldPos { return &fieldPos{index: -1} } + +func (f *fieldPos) children() []*fieldPos { return f.childrens } + +func (f *fieldPos) newChild(childName string, childBuilder array.Builder, meta arrow.Metadata) *fieldPos { + var child fieldPos = fieldPos{ + parent: f, + source: f.source, + fieldName: childName, + builder: childBuilder, + metadatas: meta, + index: int32(len(f.childrens)), + depth: f.depth + 1, + } + if f.isList { + child.isItem = true + } + child.path = child.buildNamePath() + f.childrens = append(f.childrens, &child) + return &child +} + +func (f *fieldPos) buildNamePath() []string { + var path []string + + cur := f + for i := f.depth - 1; i >= 0; i-- { + if cur.fieldName != "item" { + path = append([]string{cur.fieldName}, path...) + } else { + break + } + + if !cur.parent.isMap { + cur = cur.parent + } + } + if f.parent.parent != nil && f.parent.parent.isList { + var listPath []string + for i := len(path) - 1; i >= 0; i-- { + if path[i] != "elem" { + listPath = append([]string{path[i]}, listPath...) + } else { + return listPath + } + } + } + if f.parent != nil && f.parent.fieldName == "item" { + var listPath []string + for i := len(path) - 1; i >= 0; i-- { + if path[i] != "item" { + listPath = append([]string{path[i]}, listPath...) + } else { + return listPath + } + } + } + // avro/arrow Maps ? + // if f.parent != nil && f.parent.fieldName == "value" { + // for i := len(path) - 1; i >= 0; i-- { + // if path[i] != "value" { + // listPath = append([]string{path[i]}, listPath...) + // } else { + // return listPath + // } + // } + // } + return path +} + +// NamePath returns a slice of keys making up the path to the field +func (f *fieldPos) namePath() []string { return f.path } + +// GetValue retrieves the value from the map[string]any +// by following the field's key path +func (f *fieldPos) getValue(m any) any { + if _, ok := m.(map[string]any); !ok { + return m + } + for _, key := range f.namePath() { + valueMap, ok := m.(map[string]any) + if !ok { + if key == "item" { + return m + } + return nil + } + m, ok = valueMap[key] + if !ok { + return nil + } + } + return m +} + +// Data is loaded to Arrow arrays using the following type mapping: +// +// Avro Go Arrow +// null nil Null +// boolean bool Boolean +// bytes []byte Binary +// float float32 Float32 +// double float64 Float64 +// long int64 Int64 +// int int32 Int32 +// string string String +// array []interface{} List +// enum string Dictionary +// fixed []byte FixedSizeBinary +// map and record map[string]any Struct +// +// mapFieldBuilders builds a tree of field builders matching the Arrow schema +func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) { + f := parent.newChild(field.Name, b, field.Metadata) + switch bt := b.(type) { + case *array.BinaryBuilder: + f.appendFunc = func(data interface{}) error { + appendBinaryData(bt, data, f.source) + return nil + } + case *array.BinaryDictionaryBuilder: + // has metadata for Avro enum symbols + f.appendFunc = func(data interface{}) error { + appendBinaryDictData(bt, data, f.source) + return nil + } + // add Avro enum symbols to builder + sb := array.NewStringBuilder(memory.DefaultAllocator) + for _, v := range field.Metadata.Values() { + sb.Append(v) + } + sa := sb.NewStringArray() + bt.InsertStringDictValues(sa) + case *array.BooleanBuilder: + f.appendFunc = func(data interface{}) error { + appendBoolData(bt, data, f.source) + return nil + } + case *array.Date32Builder: + f.appendFunc = func(data interface{}) error { + appendDate32Data(bt, data, f.source) + return nil + } + case *array.Decimal128Builder: + f.appendFunc = func(data interface{}) error { + err := appendDecimal128Data(bt, data, f.source) + if err != nil { + return err + } + return nil + } + case *array.Decimal256Builder: + f.appendFunc = func(data interface{}) error { + err := appendDecimal256Data(bt, data, f.source) + if err != nil { + return err + } + return nil + } + case *extensions.UUIDBuilder: + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case string: + err := bt.AppendValueFromString(dt) + if err != nil { + return err + } + case []byte: + err := bt.AppendValueFromString(string(dt)) + if err != nil { + return err + } + } + return nil + } + case *array.FixedSizeBinaryBuilder: + f.appendFunc = func(data interface{}) error { + appendFixedSizeBinaryData(bt, data, f.source) + return nil + } + case *array.Float32Builder: + f.appendFunc = func(data interface{}) error { + appendFloat32Data(bt, data, f.source) + return nil + } + case *array.Float64Builder: + f.appendFunc = func(data interface{}) error { + appendFloat64Data(bt, data, f.source) + return nil + } + case *array.Int32Builder: + f.appendFunc = func(data interface{}) error { + appendInt32Data(bt, data, f.source) + return nil + } + case *array.Int64Builder: + f.appendFunc = func(data interface{}) error { + appendInt64Data(bt, data, f.source) + return nil + } + case *array.LargeListBuilder: + vb := bt.ValueBuilder() + f.isList = true + mapFieldBuilders(vb, field.Type.(*arrow.LargeListType).ElemField(), f) + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case []interface{}: + if len(dt) == 0 { + bt.AppendEmptyValue() + } else { + bt.Append(true) + } + default: + bt.Append(true) + } + return nil + } + case *array.ListBuilder: + vb := bt.ValueBuilder() + f.isList = true + mapFieldBuilders(vb, field.Type.(*arrow.ListType).ElemField(), f) + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case []interface{}: + if len(dt) == 0 { + bt.AppendEmptyValue() + } else { + bt.Append(true) + } + default: + bt.Append(true) + } + return nil + } + case *array.MapBuilder: + // has metadata for objects in values + f.isMap = true + kb := bt.KeyBuilder() + ib := bt.ItemBuilder() + mapFieldBuilders(kb, field.Type.(*arrow.MapType).KeyField(), f) + mapFieldBuilders(ib, field.Type.(*arrow.MapType).ItemField(), f) + f.appendFunc = func(data interface{}) error { + switch data.(type) { + case nil: + bt.AppendNull() + default: + bt.Append(true) + } + return nil + } + case *array.MonthDayNanoIntervalBuilder: + f.appendFunc = func(data interface{}) error { + appendDurationData(bt, data, f.source) + return nil + } + case *array.StringBuilder: + f.appendFunc = func(data interface{}) error { + appendStringData(bt, data, f.source) + return nil + } + case *array.StructBuilder: + // has metadata for Avro Union named types + f.typeName, _ = field.Metadata.GetValue("typeName") + f.isStruct = true + // create children + for i, p := range field.Type.(*arrow.StructType).Fields() { + mapFieldBuilders(bt.FieldBuilder(i), p, f) + } + f.appendFunc = func(data interface{}) error { + switch data.(type) { + case nil: + bt.AppendNull() + return ErrNullStructData + default: + bt.Append(true) + } + return nil + } + case *array.Time32Builder: + f.appendFunc = func(data interface{}) error { + appendTime32Data(bt, data, f.source) + return nil + } + case *array.Time64Builder: + f.appendFunc = func(data interface{}) error { + appendTime64Data(bt, data, f.source) + return nil + } + case *array.TimestampBuilder: + f.appendFunc = func(data interface{}) error { + appendTimestampData(bt, data, f.source) + return nil + } + } +} + +func appendBinaryData(b *array.BinaryBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + b.Append(dt) + case map[string]any: + if source == DataSourceAvro { + switch ct := dt["bytes"].(type) { + case nil: + b.AppendNull() + default: + b.Append(ct.([]byte)) + } + } + default: + b.Append(fmt.Append([]byte{}, data)) + } +} + +func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + b.AppendString(dt) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["string"].(type) { + case nil: + b.AppendNull() + case string: + b.AppendString(v) + } + } + } +} + +func appendBoolData(b *array.BooleanBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case bool: + b.Append(dt) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["boolean"].(type) { + case nil: + b.AppendNull() + case bool: + b.Append(v) + } + } + } +} + +func appendDate32Data(b *array.Date32Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case json.Number: + // TO-DO + case string: + date, _ := time.Parse(time.DateOnly, dt) + b.Append(arrow.Date32FromTime(date)) + case time.Time: + b.Append(arrow.Date32FromTime(dt)) + case int32: + b.Append(arrow.Date32(dt)) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["int"].(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Date32(v)) + } + } + } +} + +func appendDecimal128Data(b *array.Decimal128Builder, data any, source DataSource) error { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + // TO-DO + if source == DataSourceAvro { + buf := bytes.NewBuffer(dt) + if len(dt) <= 38 { + var intData int64 + err := binary.Read(buf, binary.BigEndian, &intData) + if err != nil { + return err + } + b.Append(decimal128.FromI64(intData)) + } else { + var bigIntData big.Int + b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + } + case map[string]any: + if source == DataSourceAvro { + buf := bytes.NewBuffer(dt["bytes"].([]byte)) + if len(dt["bytes"].([]byte)) <= 38 { + var intData int64 + err := binary.Read(buf, binary.BigEndian, &intData) + if err != nil { + return err + } + b.Append(decimal128.FromI64(intData)) + } else { + var bigIntData big.Int + b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + } + } + return nil +} + +func appendDecimal256Data(b *array.Decimal256Builder, data any, source DataSource) error { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + // TO-DO + if source == DataSourceAvro { + var bigIntData big.Int + buf := bytes.NewBuffer(dt) + b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + case map[string]any: + if source == DataSourceAvro { + var bigIntData big.Int + buf := bytes.NewBuffer(dt["bytes"].([]byte)) + b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + } + return nil +} + +// Avro duration logical type annotates Avro fixed type of size 12, which stores three little-endian +// unsigned integers that represent durations at different granularities of time. The first stores +// a number in months, the second stores a number in days, and the third stores a number in milliseconds. +// +// https://pkg.go.dev/time#Duration +// Go time.Duration int64 +// A Duration represents the elapsed time between two instants as an int64 nanosecond count. +// The representation limits the largest representable duration to approximately 290 years. +func appendDurationData(b *array.MonthDayNanoIntervalBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + // TO-DO + if source == DataSourceAvro { + dur := new(arrow.MonthDayNanoInterval) + dur.Months = int32(binary.LittleEndian.Uint16(dt[:3])) + dur.Days = int32(binary.LittleEndian.Uint16(dt[4:7])) + dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dt[8:]) * 1000000) + b.Append(*dur) + } + case map[string]any: + if source == DataSourceAvro { + switch dtb := dt["bytes"].(type) { + case nil: + b.AppendNull() + case []byte: + dur := new(arrow.MonthDayNanoInterval) + dur.Months = int32(binary.LittleEndian.Uint16(dtb[:3])) + dur.Days = int32(binary.LittleEndian.Uint16(dtb[4:7])) + dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000) + b.Append(*dur) + } + } + } +} + +func appendFixedSizeBinaryData(b *array.FixedSizeBinaryBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + b.Append(dt) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["bytes"].(type) { + case nil: + b.AppendNull() + case []byte: + b.Append(v) + } + } + } +} + +func appendFloat32Data(b *array.Float32Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case float32: + b.Append(dt) + case json.Number: + f, _ := dt.Float64() + b.Append(float32(f)) + case string: + i, _ := strconv.ParseFloat(dt, 32) + b.Append(float32(i)) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["float"].(type) { + case nil: + b.AppendNull() + case float32: + b.Append(v) + } + } + } +} + +func appendFloat64Data(b *array.Float64Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case float64: + b.Append(dt) + case json.Number: + f, _ := dt.Float64() + b.Append(f) + case string: + i, _ := strconv.ParseFloat(dt, 64) + b.Append(i) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["double"].(type) { + case nil: + b.AppendNull() + case float64: + b.Append(v) + } + } + } +} + +func appendInt8Data(b *array.Int8Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int8(dt)) + case int8: + b.Append(dt) + case json.Number: + i, _ := dt.Int64() + b.Append(int8(i)) + case string: + i, _ := strconv.ParseInt(dt, 10, 8) + b.Append(int8(i)) + case map[string]any: + + } +} + +func appendInt16Data(b *array.Int16Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int16(dt)) + case int16: + b.Append(dt) + case json.Number: + i, _ := dt.Int64() + b.Append(int16(i)) + case string: + i, _ := strconv.ParseInt(dt, 10, 16) + b.Append(int16(i)) + case map[string]any: + + } +} + +func appendInt32Data(b *array.Int32Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int32(dt)) + case int32: + b.Append(dt) + case json.Number: + i, _ := dt.Int64() + b.Append(int32(i)) + case string: + i, _ := strconv.ParseInt(dt, 10, 32) + b.Append(int32(i)) + case map[string]any: + + } +} + +func appendInt64Data(b *array.Int64Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int64(dt)) + case int64: + b.Append(dt) + case string: + i, _ := strconv.ParseInt(dt, 10, 64) + b.Append(i) + case json.Number: + i, _ := dt.Int64() + b.Append(i) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int: + b.Append(int64(v)) + case int64: + b.Append(v) + } + } + } +} + +func appendStringData(b *array.StringBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + b.Append(dt) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["string"].(type) { + case nil: + b.AppendNull() + case string: + b.Append(v) + } + } + default: + b.Append(fmt.Sprint(data)) + } +} + +func appendTime32Data(b *array.Time32Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + t, _ := arrow.Time32FromString(dt, arrow.Microsecond) + b.Append(t) + case int32: + b.Append(arrow.Time32(dt)) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["int"].(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Time32(v)) + } + } + } +} + +func appendTime64Data(b *array.Time64Builder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + t, _ := arrow.Time64FromString(dt, arrow.Microsecond) + b.Append(t) + case int64: + b.Append(arrow.Time64(dt)) + case map[string]any: + if source == DataSourceAvro { + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Time64(v)) + } + } + } +} + +func appendTimestampData(b *array.TimestampBuilder, data any, source DataSource) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case json.Number: + epochSeconds, _ := dt.Int64() + t, _ := arrow.TimestampFromTime(time.Unix(epochSeconds, 0), arrow.Microsecond) + b.Append(t) + case string: + t, _ := arrow.TimestampFromString(dt, arrow.Microsecond) + b.Append(t) + case time.Time: + t, _ := arrow.TimestampFromTime(dt, arrow.Microsecond) + b.Append(t) + case int64: + b.Append(arrow.Timestamp(dt)) + case map[string]any: + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Timestamp(v)) + } + } +} diff --git a/reader/option.go b/reader/option.go new file mode 100644 index 0000000..300e9ac --- /dev/null +++ b/reader/option.go @@ -0,0 +1,21 @@ +package reader + +import ( + "github.com/apache/arrow-go/v18/arrow/memory" +) + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + cfg.mem = mem + } +} + +// WithJSONDecoder specifies whether to use goccy/json-go as the Bodkin Reader's decoder. +// The default is the Bodkin DataLoader, a linked list of builders which reduces recursive lookups +// in maps when loading data. +func WithJSONDecoder() Option { + return func(cfg config) { + cfg.jsonDecode = true + } +} diff --git a/reader/reader.go b/reader/reader.go new file mode 100644 index 0000000..58434d7 --- /dev/null +++ b/reader/reader.go @@ -0,0 +1,155 @@ +// Package reader contains helpers for reading data and loading to Arrow. +package reader + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "sync/atomic" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + json "github.com/goccy/go-json" +) + +type DataSource int + +const ( + DataSourceGo DataSource = iota + DataSourceJSON + DataSourceAvro +) + +// Option configures an Avro reader/writer. +type ( + Option func(config) + config *DataReader +) + +type DataReader struct { + rr io.Reader + sf bufio.SplitFunc + sc *bufio.Scanner + refs int64 + source DataSource + schema *arrow.Schema + bld *array.RecordBuilder + mem memory.Allocator + bldMap *fieldPos + ldr *dataLoader + cur arrow.Record + readerCtx context.Context + readCancel func() + err error + anyChan chan any + recChan chan arrow.Record + recReq chan struct{} + bldDone chan struct{} + jsonDecode bool + chunk int + inputBufferSize int + recordBufferSize int + countInput int +} + +func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataReader, error) { + switch source { + case DataSourceGo, DataSourceJSON, DataSourceAvro: + break + default: + source = DataSourceGo + } + r := &DataReader{ + source: source, + schema: schema, + mem: memory.DefaultAllocator, + inputBufferSize: 1024 * 64, + recordBufferSize: 1024 * 64, + chunk: 0, + } + for _, opt := range opts { + opt(r) + } + + r.anyChan = make(chan any, r.inputBufferSize) + r.recChan = make(chan arrow.Record, r.recordBufferSize) + r.bldDone = make(chan struct{}) + r.recReq = make(chan struct{}, 100) + r.readerCtx, r.readCancel = context.WithCancel(context.Background()) + + r.bld = array.NewRecordBuilder(memory.DefaultAllocator, schema) + r.bldMap = newFieldPos() + r.bldMap.isStruct = true + r.source = source + r.ldr = newDataLoader() + for idx, fb := range r.bld.Fields() { + mapFieldBuilders(fb, schema.Field(idx), r.bldMap) + } + r.ldr.drawTree(r.bldMap) + go r.recordFactory() + return r, nil +} + +func (r *DataReader) ReadToRecord(a any) (arrow.Record, error) { + var err error + defer func() { + if rc := recover(); rc != nil { + fmt.Println(rc, err) + } + }() + m, err := InputMap(a) + if err != nil { + r.err = errors.Join(r.err, err) + } + + switch r.jsonDecode { + case true: + var v []byte + v, err = json.Marshal(m) + if err != nil { + r.err = err + return nil, err + } + d := json.NewDecoder(bytes.NewReader(v)) + d.UseNumber() + err = d.Decode(r.bld) + if err != nil { + return nil, err + } + default: + err = r.ldr.loadDatum(m) + if err != nil { + return nil, err + } + } + + return r.bld.NewRecord(), nil +} + +func (r *DataReader) Schema() *arrow.Schema { return r.schema } + +// Err returns the last error encountered during the reading of data. +func (r *DataReader) Err() error { return r.err } + +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (r *DataReader) Retain() { + atomic.AddInt64(&r.refs, 1) +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (r *DataReader) Release() { + // debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases") + + if atomic.AddInt64(&r.refs, -1) == 0 { + if r.cur != nil { + r.cur.Release() + } + } +} diff --git a/schema.go b/schema.go index caf08ca..3d7d4b4 100644 --- a/schema.go +++ b/schema.go @@ -295,7 +295,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { fields = append(fields, c.field) } if len(child.children) != 0 { - child.field = arrow.Field{Name: k, Type: arrow.StructOf(fields...), Nullable: true} + child.field = buildArrowField(k, arrow.StructOf(fields...), arrow.Metadata{}, true) f.assignChild(child) } else { child.arrowType = arrow.STRUCT @@ -311,7 +311,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { } else { et := sliceElemType(child, t) child.isList = true - child.field = arrow.Field{Name: k, Type: arrow.ListOf(et), Nullable: true} + child.field = buildArrowField(k, arrow.ListOf(et), arrow.Metadata{}, true) f.assignChild(child) } case nil: @@ -319,8 +319,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { f.owner.untypedFields.Set(child.dotPath(), child) f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedFieldType, child.namePath())) default: - - child.field = arrow.Field{Name: k, Type: goType2Arrow(child, v), Nullable: true} + child.field = buildArrowField(k, goType2Arrow(child, v), arrow.Metadata{}, true) f.assignChild(child) } } @@ -337,7 +336,7 @@ func mapToArrow(f *fieldPos, m map[string]any) { func sliceElemType(f *fieldPos, v []any) arrow.DataType { switch ft := v[0].(type) { case map[string]any: - child := f.newChild(f.name + "_elem") + child := f.newChild(f.name + ".elem") mapToArrow(child, ft) var fields []arrow.Field for _, c := range child.children { @@ -350,7 +349,7 @@ func sliceElemType(f *fieldPos, v []any) arrow.DataType { f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedArrayElementType, f.namePath())) return arrow.GetExtensionType("skip") } - child := f.newChild(f.name + "_elem") + child := f.newChild(f.name + ".elem") et := sliceElemType(child, v[0].([]any)) f.assignChild(child) return arrow.ListOf(et) @@ -359,3 +358,16 @@ func sliceElemType(f *fieldPos, v []any) arrow.DataType { } return nil } + +func buildArrowField(n string, t arrow.DataType, m arrow.Metadata, nullable bool) arrow.Field { + return arrow.Field{ + Name: n, + Type: t, + Metadata: m, + Nullable: nullable, + } +} + +func buildTypeMetadata(k, v []string) arrow.Metadata { + return arrow.NewMetadata(k, v) +}