diff --git a/LICENCE.txt b/LICENCE.txt new file mode 100644 index 0000000..cbfdef8 --- /dev/null +++ b/LICENCE.txt @@ -0,0 +1,174 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. \ No newline at end of file diff --git a/README.md b/README.md index f158b56..d205c35 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,114 @@ -# bodkin -Go library for decoding generic map values and native Go structures into Arrow. +# Bodkin +Go library for decoding generic map values and native Go structures to Apache Arrow. + +## Features + +- Convert a structured input (json string or []byte, Go struct or map[string]any) into an Apache Arrow schema +- Evolve the schema with new fields by providing new inputs +- Convert schema field types when to accept evolving input schemas +- Track the changes to the schema + +## 🚀 Install + +Using Bodkin is easy. First, use `go get` to install the latest version +of the library. + +```sh +go get -u github.com/loicalleyne/bodkin@latest +``` + +## 💡 Usage + +You can import `bodkin` using: + +```go +import "github.com/loicalleyne/bodkin" +``` + +Create a new Bodkin, providing some structured data and print out the resulting Arrow Schema's string representation and field evaluation errors +```go +var jsonS1 string = `{ + "count": 89, + "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", + "previous": null, + "results": [{"id":7594}], + "arrayscalar":[], + "datefield":"1979-01-01", + "timefield":"01:02:03" + }` +u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) +s, err := u.OriginSchema() +fmt.Printf("original input %v\nerrors:\n%v\n", s.String(), err) +// original input schema: +// fields: 5 +// - results: type=list, nullable>, nullable +// - datefield: type=date32, nullable +// - timefield: type=time64[ns], nullable +// - count: type=float64, nullable +// - next: type=utf8, nullable +// errors: +// could not determine type of unpopulated field : [previous] +// could not determine element type of empty array : [arrayscalar] +// could not determine type of unpopulated field : [previous] +// could not determine element type of empty array : [arrayscalar] +``` + +Provide some more structured data and print out the new merged schema and the list of changes +```go +var jsonS2 string = `{ +"count": 89.5, +"next": "https://sub.domain.com/api/search/?models=thurblig&page=3", +"previous": "https://sub.domain.com/api/search/?models=thurblig&page=2", +"results": [{"id":7594,"scalar":241.5,"nestedObj":{"strscalar":"str1","nestedarray":[123,456]}}], +"arrayscalar":["str"], +"datetime":"2024-10-24 19:03:09", +"event_time":"2024-10-24T19:03:09+00:00", +"datefield":"2024-10-24T19:03:09+00:00", +"timefield":"1970-01-01" +}` +u.Unify(jsonS2) +schema, _ := u.Schema() +fmt.Printf("\nunified %v\n", schema.String()) +fmt.Println(u.Changes()) +// unified schema: +// fields: 9 +// - count: type=float64, nullable +// - next: type=utf8, nullable +// - results: type=list>>, nullable>, nullable +// - datefield: type=timestamp[ms, tz=UTC], nullable +// - timefield: type=utf8, nullable +// - previous: type=utf8, nullable +// - datetime: type=timestamp[ms, tz=UTC], nullable +// - arrayscalar: type=list, nullable +// - event_time: type=timestamp[ms, tz=UTC], nullable +// changes: +// added $previous : utf8 +// added $datetime : timestamp[ms, tz=UTC] +// changed $datefield : from date32 to timestamp[ms, tz=UTC] +// added $results.results.elem.scalar : float64 +// added $results.results.elem.nested : struct> +// added $arrayscalar : list +// added $event_time : timestamp[ms, tz=UTC] +// changed $timefield : from time64[ns] to utf8 +``` + +Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records +```go +rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema) +defer rdr.Release() +for rdr.Next() { + rec := rdr.Record() + rj, _ := rec.MarshalJSON() + fmt.Printf("\nmarshaled record:\n%v\n", string(rj)) +} +// marshaled record: +// [{"arrayscalar":["str"],"count":89.5,"datefield":"2024-10-24 19:03:09Z","datetime":"2024-10-24 19:03:09Z","event_time":"2024-10-24 19:03:09Z","next":"https://sub.domain.com/api/search/?models=thurblig\u0026page=3","previous":"https://sub.domain.com/api/search/?models=thurblig\u0026page=2","results":[{"id":7594,"nested":{"nestedarray":[123,456],"strscalar":"str1"},"scalar":241.5}],"timefield":"1970-01-01"} +// ] +``` +## 💫 Show your support + +Give a ⭐️ if this project helped you! + +## License + +Bodkin is released under the Apache 2.0 license. See [LICENCE.txt](LICENCE.txt) \ No newline at end of file diff --git a/bodkin.go b/bodkin.go new file mode 100644 index 0000000..5c823c2 --- /dev/null +++ b/bodkin.go @@ -0,0 +1,244 @@ +package bodkin + +import ( + "errors" + "fmt" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/go-viper/mapstructure/v2" + "github.com/goccy/go-json" +) + +type ( + Option func(config) + config *Bodkin +) + +// Bodkin is a collection of field paths, describing the columns of a structured input(s). +type Bodkin struct { + original *fieldPos + old *fieldPos + new *fieldPos + inferTimeUnits bool + typeConversion bool + err error + changes error +} + +// NewBodkin returns a new Bodkin value from a structured input. +// Input must be a json byte slice or string, a struct with exported fields or map[string]any. +// Any uppopulated fields, empty objects or empty slices in the input are skipped as their +// types cannot be evaluated and converted. +func NewBodkin(a any, opts ...Option) (*Bodkin, error) { + m := map[string]interface{}{} + switch input := a.(type) { + case nil: + return nil, ErrUndefinedInput + case map[string]any: + return newBodkin(input, opts...) + case []byte: + err := json.Unmarshal(input, &m) + if err != nil { + return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err) + } + case string: + err := json.Unmarshal([]byte(input), &m) + if err != nil { + return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err) + } + default: + err := mapstructure.Decode(a, &m) + if err != nil { + return nil, fmt.Errorf("%v : %v", ErrInvalidInput, err) + } + } + return newBodkin(m, opts...) +} + +func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { + b := &Bodkin{} + f := newFieldPos(b) + for _, opt := range opts { + opt(b) + } + mapToArrow(f, m) + b.old = f + + g := newFieldPos(b) + mapToArrow(g, m) + b.original = g + return b, errWrap(f) +} + +// Err returns the last error encountered during the unification of input schemas. +func (u *Bodkin) Err() error { return u.err } + +// Changes returns a list of field additions and field type conversions done +// in the lifetime of the Bodkin object. +func (u *Bodkin) Changes() error { return u.changes } + +// WithInferTimeUnits() enables scanning input string values for time, date and timestamp types. +// +// Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot +// exceed the precision allowed by the time unit, otherwise unmarshalling will error. +// +// # Dates use YYYY-MM-DD format +// +// Timestamps use RFC3339Nano format except without a timezone, all of the following are valid: +// +// YYYY-MM-DD +// YYYY-MM-DD[T]HH +// YYYY-MM-DD[T]HH:MM +// YYYY-MM-DD[T]HH:MM:SS[.zzzzzzzzzz] +func WithInferTimeUnits() Option { + return func(cfg config) { + cfg.inferTimeUnits = true + } +} + +// WithTypeConversion enables upgrading the column types to fix compatibilty conflicts. +func WithTypeConversion() Option { + return func(cfg config) { + cfg.typeConversion = true + } +} + +// Unify merges structured input's column definition with the previously input's schema. +// Any uppopulated fields, empty objects or empty slices in the input are skipped. +func (u *Bodkin) Unify(a any) { + m := map[string]interface{}{} + switch input := a.(type) { + case nil: + u.err = ErrUndefinedInput + case []byte: + err := json.Unmarshal(input, &m) + if err != nil { + u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) + return + } + case string: + err := json.Unmarshal([]byte(input), &m) + if err != nil { + u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) + return + } + case map[string]any: + f := newFieldPos(u) + mapToArrow(f, m) + u.new = f + for _, field := range u.new.children { + u.merge(field) + } + default: + err := mapstructure.Decode(a, &m) + if err != nil { + u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) + return + } + } + f := newFieldPos(u) + mapToArrow(f, m) + u.new = f + for _, field := range u.new.children { + u.merge(field) + } +} + +// Schema returns the original Arrow schema generated from the structure/types of +// the initial input, and wrapped errors indicating which fields could not be evaluated. +func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { + var fields []arrow.Field + for _, c := range u.original.children { + fields = append(fields, c.field) + } + err := errWrap(u.original) + return arrow.NewSchema(fields, nil), err +} + +// Schema returns the current merged Arrow schema generated from the structure/types of +// the input(s), and wrapped errors indicating which fields could not be evaluated. +func (u *Bodkin) Schema() (*arrow.Schema, error) { + var fields []arrow.Field + for _, c := range u.old.children { + fields = append(fields, c.field) + } + err := errWrap(u.old) + return arrow.NewSchema(fields, nil), err +} + +// LastSchema returns the Arrow schema generated from the structure/types of +// the most recent input. Any uppopulated fields, empty objects or empty slices are skipped. +// ErrNoLatestSchema if Unify() has never been called. +func (u *Bodkin) LastSchema() (*arrow.Schema, error) { + if u.new == nil { + return nil, ErrNoLatestSchema + } + var fields []arrow.Field + for _, c := range u.new.children { + fields = append(fields, c.field) + } + err := errWrap(u.new) + return arrow.NewSchema(fields, nil), err +} + +// merge merges a new or changed field into the unified schema. +// Conflicting TIME, DATE, TIMESTAMP types are upgraded to STRING. +// DATE can upgrade to TIMESTAMP. +// INTEGER can upgrade to FLOAT. +func (u *Bodkin) merge(n *fieldPos) { + if kin, err := u.old.getPath(n.path); err == ErrPathNotFound { + // root graft + if n.root == n.parent { + u.old.root.graft(n) + } else { + // branch graft + b, _ := u.old.getPath(n.parent.path) + b.graft(n) + } + } else { + if u.typeConversion && (!kin.field.Equal(n.field) && kin.field.Type.ID() != n.field.Type.ID()) { + switch kin.field.Type.ID() { + case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64: + switch n.field.Type.ID() { + case arrow.FLOAT16, arrow.FLOAT32, arrow.FLOAT64: + err := kin.upgradeType(n, arrow.FLOAT64) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.TIMESTAMP: + switch n.field.Type.ID() { + case arrow.TIME64: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.DATE32: + switch n.field.Type.ID() { + case arrow.TIMESTAMP: + err := kin.upgradeType(n, arrow.TIMESTAMP) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + case arrow.TIME64: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + case arrow.TIME64: + switch n.field.Type.ID() { + case arrow.DATE32, arrow.TIMESTAMP: + err := kin.upgradeType(n, arrow.STRING) + if err != nil { + kin.err = errors.Join(kin.err, err) + } + } + } + } + for _, v := range n.childmap { + u.merge(v) + } + } +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..c23d3b5 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "strings" + + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/loicalleyne/bodkin" +) + +func main() { + u, _ := bodkin.NewBodkin(jsonS1, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion()) + s, err := u.OriginSchema() + fmt.Printf("original input %v\nerrors:\n%v\n", s.String(), err) + + u.Unify(jsonS2) + schema, err := u.Schema() + if err != nil { + fmt.Println(err) + } + fmt.Printf("changes:\n%v\n", u.Changes()) + fmt.Printf("\nunified %v\nerrors:\n%v\n", schema.String(), err) + + rdr := array.NewJSONReader(strings.NewReader(jsonS2), schema) + defer rdr.Release() + for rdr.Next() { + rec := rdr.Record() + rj, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + fmt.Printf("\nmarshaled record:\n%v\n", string(rj)) + } + if err := rdr.Err(); err != nil { + fmt.Println(err) + } + fmt.Println(u.Changes()) + + u.Unify(jsonS3) + schema, err = u.Schema() + if err != nil { + fmt.Println(err) + } + fmt.Printf("\nsecond unified %v\nerrors:\n%v\n", schema.String(), err) + + rdr = array.NewJSONReader(strings.NewReader(jsonS3), schema) + defer rdr.Release() + for rdr.Next() { + rec := rdr.Record() + rj, err := rec.MarshalJSON() + if err != nil { + fmt.Printf("error marshaling record: %v\n", err) + } + fmt.Printf("\nmarshaled record:\n%v\n", string(rj)) + } + if err := rdr.Err(); err != nil { + fmt.Println(err) + } + fmt.Println(u.Changes()) +} + +var jsonS1 string = `{ + "count": 89, + "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", + "previous": null, + "results": [{"id":7594}], + "arrayscalar":[], + "datefield":"1979-01-01", + "timefield":"01:02:03" + }` + +var jsonS2 string = `{ + "count": 89.5, + "next": "https://sub.domain.com/api/search/?models=thurblig&page=3", + "previous": "https://sub.domain.com/api/search/?models=thurblig&page=2", + "results": [{"id":7594,"scalar":241.5,"nested":{"strscalar":"str1","nestedarray":[123,456]}}], + "arrayscalar":["str"], + "datetime":"2024-10-24 19:03:09", + "event_time":"2024-10-24T19:03:09+00:00", + "datefield":"2024-10-24T19:03:09+00:00", + "timefield":"1970-01-01" + }` + +var jsonS3 string = `{ + "count": 85, + "next": "https://sub.domain.com/api/search/?models=thurblig", + "previous": null, + "results": [ + { + "id": 6328, + "name": "New user SMB check 2310-1", + "external_id": null, + "title": "New user SMB check 2310-1", + "content_type": "new agent", + "model": "Agent", + "emptyobj":{}, + "data": { + "id": 6328, + "nestednullscalar": null, + "dsp": { + "id": 116, + "name": "El Thingy Bueno", + "nullarray":[] + }, + "name": "New user SMB check 2310-1", + "agency": { + "id": 925, + "name": "New user SMB check 2310-1" + }, + "export_status": { + "status": true + } + } + } + ] + }` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ef3e5bd --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/loicalleyne/bodkin + +go 1.22.3 + +require ( + github.com/apache/arrow-go/v18 v18.0.0 + github.com/go-viper/mapstructure/v2 v2.2.1 + github.com/goccy/go-json v0.10.3 +) + +require ( + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/tools v0.26.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..76b015e --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= +github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/schema.go b/schema.go new file mode 100644 index 0000000..1281348 --- /dev/null +++ b/schema.go @@ -0,0 +1,358 @@ +package bodkin + +import ( + "errors" + "fmt" + "regexp" + "slices" + + "github.com/apache/arrow-go/v18/arrow" +) + +type fieldPos struct { + root *fieldPos + parent *fieldPos + owner *Bodkin + name string + path []string + field arrow.Field + children []*fieldPos + childmap map[string]*fieldPos + index, depth int32 + err error +} + +var ( + ErrUndefinedInput = errors.New("nil input") + ErrInvalidInput = errors.New("invalid input") + ErrNoLatestSchema = errors.New("no second input has been provided") + ErrUndefinedFieldType = errors.New("could not determine type of unpopulated field") + ErrUndefinedArrayElementType = errors.New("could not determine element type of empty array") + ErrNotAnUpgradableType = errors.New("is not an upgradable type") + ErrPathNotFound = errors.New("path not found") + timestampMatchers []*regexp.Regexp + dateMatcher *regexp.Regexp + timeMatcher *regexp.Regexp + // UpgradableTypes are scalar types that can be upgraded to a more flexible type. + UpgradableTypes []arrow.Type = []arrow.Type{arrow.INT8, + arrow.INT16, + arrow.INT32, + arrow.INT64, + arrow.DATE32, + arrow.TIME64, + arrow.TIMESTAMP, + arrow.STRING, + } +) + +func init() { + registerTsMatchers() +} + +func registerTsMatchers() { + dateMatcher = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`) + timeMatcher = regexp.MustCompile(`^\d{1,2}:\d{1,2}:\d{1,2}(\.\d{1,6})?$`) + timestampMatchers = append(timestampMatchers, + regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})$`), // ISO 8601 + regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})$`), // RFC 3339 with space instead of T + regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$`), // Datetime format with dashes + regexp.MustCompile(`^\d{4}-\d{1,2}-\d{1,2}[T ]\d{1,2}:\d{1,2}:\d{1,2}(\.\d{1,6})? *(([+-]\d{1,2}(:\d{1,2})?)|Z|UTC)?$`)) +} + +func newFieldPos(b *Bodkin) *fieldPos { + f := new(fieldPos) + f.owner = b + f.index = -1 + f.root = f + f.childmap = make(map[string]*fieldPos) + f.children = make([]*fieldPos, 0) + return f +} + +func (f *fieldPos) assignChild(child *fieldPos) { + f.children = append(f.children, child) + f.childmap[child.name] = child +} + +func (f *fieldPos) child(index int) (*fieldPos, error) { + if index < len(f.children) { + return f.children[index], nil + } + return nil, fmt.Errorf("%v child index %d not found", f.namePath(), index) +} + +func (f *fieldPos) error() error { return f.err } +func (f *fieldPos) metadata() arrow.Metadata { return f.field.Metadata } + +func (f *fieldPos) newChild(childName string) *fieldPos { + var child fieldPos = fieldPos{ + root: f.root, + parent: f, + owner: f.owner, + name: childName, + index: int32(len(f.children)), + depth: f.depth + 1, + } + child.path = child.namePath() + child.childmap = make(map[string]*fieldPos) + return &child +} + +func (f *fieldPos) mapChildren() { + for i, c := range f.children { + f.childmap[c.name] = f.children[i] + } +} + +func (f *fieldPos) getPath(path []string) (*fieldPos, error) { + if len(path) == 0 { // degenerate input + return nil, fmt.Errorf("getPath needs at least one key") + } + if node, ok := f.childmap[path[0]]; !ok { + return nil, ErrPathNotFound + } else if len(path) == 1 { // we've reached the final key + return node, nil + } else { // 1+ more keys + return node.getPath(path[1:]) + } +} + +// namePath returns a slice of keys making up the path to the field +func (f *fieldPos) namePath() []string { + if len(f.path) == 0 { + var path []string + cur := f + for i := f.depth - 1; i >= 0; i-- { + path = append([]string{cur.name}, path...) + cur = cur.parent + } + return path + } + return f.path +} + +// namePath returns the path to the field in json dot notation +func (f *fieldPos) dotPath() string { + var path string = "$" + for i, p := range f.path { + path = path + p + if i+1 != len(f.path) { + path = path + "." + } + } + return path +} + +// getValue retrieves the value from the map[string]interface{} +// by following the field's key path +func (f *fieldPos) getValue(m map[string]interface{}) interface{} { + var value interface{} = m + for _, key := range f.namePath() { + valueMap, ok := value.(map[string]interface{}) + if !ok { + return nil + } + value, ok = valueMap[key] + if !ok { + return nil + } + } + return value +} + +// graft grafts a new field into the schema tree +func (f *fieldPos) graft(n *fieldPos) { + graft := f.newChild(n.name) + graft.field = n.field + graft.children = append(graft.children, n.children...) + graft.mapChildren() + f.assignChild(graft) + f.owner.changes = errors.Join(f.owner.changes, fmt.Errorf("added %v : %v", graft.dotPath(), graft.field.Type.String())) + if f.field.Type.ID() == arrow.STRUCT { + gf := f.field.Type.(*arrow.StructType) + var nf []arrow.Field + nf = append(nf, gf.Fields()...) + nf = append(nf, graft.field) + f.field = arrow.Field{Name: graft.name, Type: arrow.StructOf(nf...), Nullable: true} + if (f.parent != nil) && f.parent.field.Type.ID() == arrow.LIST { + f.parent.field = arrow.Field{Name: f.parent.name, Type: arrow.ListOf(f.field.Type.(*arrow.StructType)), Nullable: true} + } + } +} + +// Only scalar types in UpgradableTypes[] can be upgraded +func (o *fieldPos) upgradeType(n *fieldPos, t arrow.Type) error { + if !slices.Contains(UpgradableTypes, n.field.Type.ID()) { + return fmt.Errorf("%v %v", n.field.Type.Name(), ErrNotAnUpgradableType.Error()) + } + oldType := o.field.Type.String() + switch t { + case arrow.FLOAT64: + o.field = arrow.Field{Name: o.name, Type: arrow.PrimitiveTypes.Float64, Nullable: true} + case arrow.STRING: + o.field = arrow.Field{Name: o.name, Type: arrow.BinaryTypes.String, Nullable: true} + case arrow.TIMESTAMP: + o.field = arrow.Field{Name: o.name, Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true} + } + switch o.parent.field.Type.ID() { + case arrow.LIST: + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.ListOf(n.field.Type), Nullable: true} + case arrow.STRUCT: + var fields []arrow.Field + for _, c := range o.parent.children { + fields = append(fields, c.field) + } + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.StructOf(fields...), Nullable: true} + } + o.owner.changes = errors.Join(o.owner.changes, fmt.Errorf("changed %v : from %v to %v", o.dotPath(), oldType, o.field.Type.String())) + return nil +} + +func errWrap(f *fieldPos) error { + var err error + if f.err != nil { + err = errors.Join(f.err) + } + if len(f.children) > 0 { + for _, field := range f.children { + err = errors.Join(err, errWrap(field)) + } + } + return err +} + +func mapToArrow(f *fieldPos, m map[string]interface{}) { + for k, v := range m { + child := f.newChild(k) + switch t := v.(type) { + case map[string]interface{}: + mapToArrow(child, t) + var fields []arrow.Field + for _, c := range child.children { + fields = append(fields, c.field) + } + if len(child.children) != 0 { + child.field = arrow.Field{Name: k, Type: arrow.StructOf(fields...), Nullable: true} + f.assignChild(child) + } + + case []interface{}: + if len(t) <= 0 { + f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedArrayElementType, child.namePath())) + } else { + et := sliceElemType(child, t) + child.field = arrow.Field{Name: k, Type: arrow.ListOf(et), Nullable: true} + f.assignChild(child) + } + case nil: + f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedFieldType, child.namePath())) + default: + child.field = arrow.Field{Name: k, Type: goType2Arrow(child, v), Nullable: true} + f.assignChild(child) + } + } + var fields []arrow.Field + for _, c := range f.children { + fields = append(fields, c.field) + } + f.field = arrow.Field{Name: f.name, Type: arrow.StructOf(fields...), Nullable: true} +} + +func sliceElemType(f *fieldPos, v []interface{}) arrow.DataType { + switch ft := v[0].(type) { + case map[string]interface{}: + child := f.newChild(f.name + ".elem") + mapToArrow(child, ft) + var fields []arrow.Field + for _, c := range child.children { + fields = append(fields, c.field) + } + f.assignChild(child) + return arrow.StructOf(fields...) + case []interface{}: + if len(ft) < 1 { + f.err = errors.Join(f.err, fmt.Errorf("%v : %v", ErrUndefinedArrayElementType, f.namePath())) + return arrow.GetExtensionType("skip") + } + child := f.newChild(f.name + ".elem") + et := sliceElemType(child, v[0].([]interface{})) + f.assignChild(child) + return arrow.ListOf(et) + default: + return goType2Arrow(f, v) + } + return nil +} + +func goType2Arrow(f *fieldPos, gt any) arrow.DataType { + var dt arrow.DataType + switch t := gt.(type) { + case []any: + return goType2Arrow(f, t[0]) + // either 32 or 64 bits + case int: + dt = arrow.PrimitiveTypes.Int64 + // the set of all signed 8-bit integers (-128 to 127) + case int8: + dt = arrow.PrimitiveTypes.Int8 + // the set of all signed 16-bit integers (-32768 to 32767) + case int16: + dt = arrow.PrimitiveTypes.Int16 + // the set of all signed 32-bit integers (-2147483648 to 2147483647) + case int32: + dt = arrow.PrimitiveTypes.Int32 + // the set of all signed 64-bit integers (-9223372036854775808 to 9223372036854775807) + case int64: + dt = arrow.PrimitiveTypes.Int64 + // either 32 or 64 bits + case uint: + dt = arrow.PrimitiveTypes.Uint64 + // the set of all unsigned 8-bit integers (0 to 255) + case uint8: + dt = arrow.PrimitiveTypes.Uint8 + // the set of all unsigned 16-bit integers (0 to 65535) + case uint16: + dt = arrow.PrimitiveTypes.Uint16 + // the set of all unsigned 32-bit integers (0 to 4294967295) + case uint32: + dt = arrow.PrimitiveTypes.Uint32 + // the set of all unsigned 64-bit integers (0 to 18446744073709551615) + case uint64: + dt = arrow.PrimitiveTypes.Uint64 + // the set of all IEEE-754 32-bit floating-point numbers + case float32: + dt = arrow.PrimitiveTypes.Float32 + // the set of all IEEE-754 64-bit floating-point numbers + case float64: + dt = arrow.PrimitiveTypes.Float64 + case bool: + dt = arrow.FixedWidthTypes.Boolean + case string: + if f.owner.inferTimeUnits { + for _, r := range timestampMatchers { + if r.MatchString(t) { + return arrow.FixedWidthTypes.Timestamp_ms + } + } + if dateMatcher.MatchString(t) { + return arrow.FixedWidthTypes.Date32 + } + if timeMatcher.MatchString(t) { + return arrow.FixedWidthTypes.Time64ns + } + } + dt = arrow.BinaryTypes.String + case []byte: + dt = arrow.BinaryTypes.Binary + // the set of all complex numbers with float32 real and imaginary parts + case complex64: + // TO-DO + // the set of all complex numbers with float64 real and imaginary parts + case complex128: + // TO-DO + case nil: + f.err = fmt.Errorf("%v : %v", ErrUndefinedFieldType, f.namePath()) + dt = arrow.BinaryTypes.Binary + } + return dt +}