diff --git a/.gitignore b/.gitignore index cbd132c..b4a493f 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ go.work.sum avro/ map.go +*.schema \ No newline at end of file diff --git a/README.md b/README.md index eef35e0..00d5f09 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ The goal is to provide a useful toolkit to make it easier to use Arrow, and by e - Automatically evolves the Arrow schema with new fields when providing new inputs - Converts schema field types when unifying schemas to accept evolving input data - Tracks changes to the schema +- Export/import a schema to/from a file to persist schema definition ## 🚀 Install diff --git a/bodkin.go b/bodkin.go index eb3a577..6628682 100644 --- a/bodkin.go +++ b/bodkin.go @@ -7,12 +7,20 @@ import ( "bytes" "errors" "fmt" + "os" "slices" "strings" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/pqarrow" "github.com/go-viper/mapstructure/v2" json "github.com/goccy/go-json" + "github.com/loicalleyne/bodkin/pq" omap "github.com/wk8/go-ordered-map/v2" ) @@ -178,6 +186,96 @@ func (u *Bodkin) Paths() []Field { return paths } +// ExportSchema exports an Arrow Schema by writing a record full of null values to an Arrow IPC file. +func (u *Bodkin) ExportSchema(exportPath string) error { + f, err := os.Open(exportPath) + if err != nil { + return err + } + defer f.Close() + + schema, err := u.Schema() + if err != nil { + return err + } + var prp *parquet.WriterProperties = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithCompression(compress.Codecs.Zstd), + parquet.WithStats(true), + parquet.WithRootName("bodkin"), + ) + + pw, _, err := pq.NewParquetWriter(schema, prp, exportPath) + if err != nil { + return err + } + defer pw.Close() + + rb := array.NewRecordBuilder(memory.DefaultAllocator, schema) + // json data unlikely to conflict with a real detected schema + // m, _ := InputMap([]byte(`{"nevergonnagiveyouup":"nevergonnaletyoudown"}`)) + m := make(map[string]any) + for _, c := range u.old.children { + switch c.arrowType { + case arrow.STRING: + if c.field.Type.ID() != arrow.LIST { + m[c.name] = "nevergonnagiveyouup" + } + case arrow.FLOAT64: + m[c.name] = 1.2345 + case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64: + m[c.name] = 1234 + } + } + data, err := json.Marshal(m) + if err != nil { + return err + } + fmt.Println(string(data)) + // array.UnmarshalJSON for record builder will read in a single object and add the values to each field in the recordbuilder, + // missing fields will get a null and unexpected keys will be ignored. If reading in an array of records as a single batch, + // then use a structbuilder and use RecordFromStruct. This is fine as we are only interested in the Arrow schema. + for i := 0; i < 10; i++ { + err = rb.UnmarshalJSON(data) + if err != nil { + return err + } + } + + rec := rb.NewRecord() + err = pw.WriteRecord(rec) + if err != nil { + return err + } + + f.Sync() + return nil +} + +// ImportSchema imports an Arrow Schema from an Arrow IPC file. +func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) { + f, err := os.Open(importPath) + if err != nil { + return nil, err + } + defer f.Close() + pqr, err := file.OpenParquetFile(importPath, true) + if err != nil { + return nil, err + } + + fr, err := pqarrow.NewFileReader(pqr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + if err != nil { + return nil, err + } + schema, err := fr.Schema() + if schema == nil { + return nil, fmt.Errorf("could not import schema from %s : %v", importPath, err) + } + return schema, nil +} + // Unify merges structured input's column definition with the previously input's schema. // Any uppopulated fields, empty objects or empty slices in JSON input are skipped. func (u *Bodkin) Unify(a any) { diff --git a/cmd/main.go b/cmd/main.go index 0829023..5a9b83c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -169,6 +169,18 @@ func main() { fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath) } fmt.Println(u.Changes()) + err = u.ExportSchema("./test.schema") + if err != nil { + fmt.Println(err) + } else { + imp, err := u.ImportSchema("./test.schema") + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("imported %v\n", imp.String()) + } + + } } diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go index 95e9d1d..d5d4394 100644 --- a/json2parquet/json2parquet.go +++ b/json2parquet/json2parquet.go @@ -11,6 +11,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/parquet" "github.com/loicalleyne/bodkin" + "github.com/loicalleyne/bodkin/pq" ) func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) { @@ -62,11 +63,11 @@ func RecordsFromFile(inputFile, outputFile string, schema *arrow.Schema, munger } }() defer f.Close() - var prp *parquet.WriterProperties = defaultWrtp + var prp *parquet.WriterProperties = pq.DefaultWrtp if len(opts) != 0 { prp = parquet.NewWriterProperties(opts...) } - pw, _, err := NewParquetWriter(schema, prp, outputFile) + pw, _, err := pq.NewParquetWriter(schema, prp, outputFile) if err != nil { return 0, err } diff --git a/json2parquet/parquet_writer.go b/pq/parquet_writer.go similarity index 94% rename from json2parquet/parquet_writer.go rename to pq/parquet_writer.go index 5ab35a7..8774ec1 100644 --- a/json2parquet/parquet_writer.go +++ b/pq/parquet_writer.go @@ -1,4 +1,4 @@ -package json2parquet +package pq import ( "fmt" @@ -18,12 +18,12 @@ const ( ) var ( - defaultWrtp = parquet.NewWriterProperties( + DefaultWrtp = parquet.NewWriterProperties( parquet.WithDictionaryDefault(true), parquet.WithVersion(parquet.V2_LATEST), parquet.WithCompression(compress.Codecs.Zstd), parquet.WithStats(true), - parquet.WithRootName("json2parquet"), + parquet.WithRootName("bodkin"), ) ) @@ -63,8 +63,8 @@ func NewParquetWriter(sc *arrow.Schema, wrtp *parquet.WriterProperties, path str if err != nil { return nil, nil, fmt.Errorf("failed to create destination file: %w", err) } - - pqwrt, err := pqarrow.NewFileWriter(sc, destFile, wrtp, pqarrow.DefaultWriterProps()) + artp := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema()) + pqwrt, err := pqarrow.NewFileWriter(sc, destFile, wrtp, artp) if err != nil { return nil, nil, fmt.Errorf("failed to create parquet writer: %w", err) }