diff --git a/.gitignore b/.gitignore index b4a493f..01d0172 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,5 @@ go.work.sum avro/ map.go -*.schema \ No newline at end of file +*.schema +*.pgo \ No newline at end of file diff --git a/README.md b/README.md index 9d191b1..da58363 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Bodkin enables you to use your _data_ to define and evolve your Arrow Schema. - Automatically evolves the Arrow schema with new fields when providing new inputs - Converts schema field types when unifying schemas to accept evolving input data - Tracks changes to the schema -- Export/import a schema to/from a stub parquet file to persist schema definition +- Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition ## 🚀 Install @@ -150,11 +150,15 @@ for rdr.Next() { // ] ``` -Export your schema to a file, then import the file to retrieve the schema +Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte. ```go - _ = u.ExportSchema("./test.schema") - imp, _ := u.ImportSchema("./test.schema") - fmt.Printf("imported %v\n", imp.String()) +_ = u.ExportSchemaFile("./test.schema") +imp, _ := u.ImportSchemaFile("./test.schema") +fmt.Printf("imported %v\n", imp.String()) + +bs, _ := u.ExportSchemaBytes() +sc, _ := u.ImportSchemaBytes(bs) +fmt.Printf("imported %v\n", sc.String()) ``` ## 💫 Show your support diff --git a/bodkin.go b/bodkin.go index 45ca3bc..25296a3 100644 --- a/bodkin.go +++ b/bodkin.go @@ -13,15 +13,10 @@ import ( "strings" "github.com/apache/arrow-go/v18/arrow" - "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/flight" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/apache/arrow-go/v18/parquet" - "github.com/apache/arrow-go/v18/parquet/compress" - "github.com/apache/arrow-go/v18/parquet/file" - "github.com/apache/arrow-go/v18/parquet/pqarrow" "github.com/go-viper/mapstructure/v2" json "github.com/goccy/go-json" - "github.com/loicalleyne/bodkin/pq" omap "github.com/wk8/go-ordered-map/v2" ) @@ -205,90 +200,41 @@ func (u *Bodkin) Paths() []Field { return paths } -// ExportSchema exports an Arrow Schema by writing a record full of null values to an Arrow IPC file. -func (u *Bodkin) ExportSchema(exportPath string) error { - f, err := os.Open(exportPath) - if err != nil { - return err - } - defer f.Close() - +// ExportSchema exports a serialized Arrow Schema to a file. +func (u *Bodkin) ExportSchemaFile(exportPath string) error { schema, err := u.Schema() if err != nil { return err } - var prp *parquet.WriterProperties = parquet.NewWriterProperties( - parquet.WithDictionaryDefault(true), - parquet.WithVersion(parquet.V2_LATEST), - parquet.WithCompression(compress.Codecs.Zstd), - parquet.WithStats(true), - parquet.WithRootName("bodkin"), - ) - - pw, _, err := pq.NewParquetWriter(schema, prp, exportPath) + bs := flight.SerializeSchema(schema, memory.DefaultAllocator) + err = os.WriteFile("./temp.schema", bs, 0644) if err != nil { return err } - defer pw.Close() - - rb := array.NewRecordBuilder(memory.DefaultAllocator, schema) - m := make(map[string]any) - for _, c := range u.old.children { - switch c.arrowType { - case arrow.STRING: - if c.field.Type.ID() != arrow.LIST { - m[c.name] = "nevergonnagiveyouup" - } - case arrow.FLOAT64: - m[c.name] = 1.2345 - case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64: - m[c.name] = 1234 - } - } - data, err := json.Marshal(m) - if err != nil { - return err - } - // array.UnmarshalJSON for record builder will read in a single object and add the values to each field in the recordbuilder, - // missing fields will get a null and unexpected keys will be ignored. If reading in an array of records as a single batch, - // then use a structbuilder and use RecordFromStruct. This is fine as we are only interested in the Arrow schema. - for i := 0; i < 10; i++ { - err = rb.UnmarshalJSON(data) - if err != nil { - return err - } - } - - rec := rb.NewRecord() - err = pw.WriteRecord(rec) - if err != nil { - return err - } - - return f.Sync() + return nil } -// ImportSchema imports an Arrow Schema from an Arrow IPC file. -func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) { - f, err := os.Open(importPath) - if err != nil { - return nil, err - } - defer f.Close() - pqr, err := file.OpenParquetFile(importPath, true) +// ImportSchema imports a serialized Arrow Schema from a file. +func (u *Bodkin) ImportSchemaFile(importPath string) (*arrow.Schema, error) { + dat, err := os.ReadFile(importPath) if err != nil { return nil, err } + return flight.DeserializeSchema(dat, memory.DefaultAllocator) +} - fr, err := pqarrow.NewFileReader(pqr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) +// ExportSchemaBytes exports a serialized Arrow Schema. +func (u *Bodkin) ExportSchemaBytes() ([]byte, error) { + schema, err := u.Schema() if err != nil { return nil, err } - schema, err := fr.Schema() - if schema == nil { - return nil, fmt.Errorf("could not import schema from %s : %v", importPath, err) - } - return schema, nil + return flight.SerializeSchema(schema, memory.DefaultAllocator), nil +} + +// ImportSchemaBytes imports a serialized Arrow Schema. +func (u *Bodkin) ImportSchemaBytes(dat []byte) (*arrow.Schema, error) { + return flight.DeserializeSchema(dat, memory.DefaultAllocator) } // Unify merges structured input's column definition with the previously input's schema. diff --git a/cmd/main.go b/cmd/main.go index 483882e..57464cb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "log" "strings" "github.com/apache/arrow-go/v18/arrow/array" @@ -169,18 +170,27 @@ func main() { fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath) } fmt.Println(u.Changes()) - err = u.ExportSchema("./test.schema") + bs, err := u.ExportSchemaBytes() if err != nil { fmt.Println(err) } else { - imp, err := u.ImportSchema("./test.schema") + imp, err := u.ImportSchemaBytes(bs) if err != nil { fmt.Println(err) } else { fmt.Printf("imported %v\n", imp.String()) } } - + err = u.ExportSchemaFile("./temp.schema") + if err != nil { + log.Fatal(err) + } + sb, err := u.ImportSchemaFile("./temp.schema") + if err != nil { + fmt.Println(err) + } else { + fmt.Println("deserialized:\n", sb.String()) + } } var jsonS1 string = `{ diff --git a/json2parquet/.gitignore b/json2parquet/.gitignore index 4721a81..81d7499 100644 --- a/json2parquet/.gitignore +++ b/json2parquet/.gitignore @@ -1,3 +1,2 @@ -cmd/* -cmd/*.json -cmd/*.parquet \ No newline at end of file +*.json +*.parquet \ No newline at end of file diff --git a/json2parquet/cmd/.gitignore b/json2parquet/cmd/.gitignore new file mode 100644 index 0000000..81d7499 --- /dev/null +++ b/json2parquet/cmd/.gitignore @@ -0,0 +1,2 @@ +*.json +*.parquet \ No newline at end of file diff --git a/json2parquet/cmd/cleaner/main.go b/json2parquet/cmd/cleaner/main.go new file mode 100644 index 0000000..c296c97 --- /dev/null +++ b/json2parquet/cmd/cleaner/main.go @@ -0,0 +1,115 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/goccy/go-json" + + "github.com/redpanda-data/benthos/v4/public/bloblang" +) + +// jcleaner takes as input a JSONL file, and removes all null fields, empty arrays, +// empty objects and empty strings. +func main() { + inputFile := flag.String("in", "", "input file") + outputFile := flag.String("out", "", "output file") + flag.Parse() + if *inputFile == "" { + log.Fatal("no input file specified") + } + if *outputFile == "" { + log.Fatal("no output file specified") + } + problemLines := fileNameWithoutExt(*outputFile) + "_problem.json" + f, err := os.Open(*inputFile) + if err != nil { + panic(err) + } + defer func() { + if r := recover(); r != nil { + fmt.Println(err) + } + }() + defer f.Close() + bloblangMapping := `map remove_null_empty { + root = match { + (this.type() == "object" && this.length() == 0) => deleted() + this.type() == "object" => this.map_each(i -> i.value.apply("remove_null_empty")) + (this.type() == "array" && this.length() == 0) => deleted() + this.type() == "array" => this.map_each(v -> v.apply("remove_null_empty")) + this.type() == "null" => deleted() + this.type() == "string" && this.length() == 0 => deleted() + } + } + root = this.apply("remove_null_empty")` + exe, err := bloblang.Parse(bloblangMapping) + if err != nil { + log.Println(err) + } + + nf, err := os.Create(*outputFile) + if err != nil { + panic(err) + } + defer nf.Close() + w := bufio.NewWriterSize(nf, 1024*4) + + pf, err := os.Create(problemLines) + if err != nil { + panic(err) + } + defer pf.Close() + pw := bufio.NewWriterSize(nf, 1024*4) + + r := bufio.NewReaderSize(f, 1024*4) + s := bufio.NewScanner(r) + newline := []byte("\n") + for s.Scan() { + y := s.Bytes() + b, err := ApplyBloblangMapping(y, exe) + if err != nil { + pw.Write(y) + pw.Write(newline) + continue + } + _, err = w.Write(b) + if err != nil { + pw.Write(y) + pw.Write(newline) + continue + } + w.Write(newline) + } + w.Flush() +} + +func ApplyBloblangMapping(jsonInput []byte, exe *bloblang.Executor) ([]byte, error) { + // Parse the JSON input into a map[string]interface{} + var inputMap map[string]interface{} + if err := json.Unmarshal(jsonInput, &inputMap); err != nil { + return nil, err + } + + // Execute the Bloblang mapping + res, err := exe.Query(inputMap) + if err != nil { + return nil, err + } + + // Convert the result back into a JSON string + jsonResult, err := json.Marshal(res) + if err != nil { + return nil, err + } + + return jsonResult, nil +} + +func fileNameWithoutExt(fileName string) string { + return fileName[:len(fileName)-len(filepath.Ext(fileName))] +} diff --git a/json2parquet/cmd/main.go b/json2parquet/cmd/main.go new file mode 100644 index 0000000..e047734 --- /dev/null +++ b/json2parquet/cmd/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime/pprof" + + "github.com/redpanda-data/benthos/v4/public/bloblang" + + "github.com/loicalleyne/bodkin" + j2p "github.com/loicalleyne/bodkin/json2parquet" +) + +var exe *bloblang.Executor +var cpuprofile = flag.String("cpuprofile", "default.pgo", "write cpu profile to `file`") + +func main() { + inferMode := flag.Bool("infer_timeunits", true, "Infer date, time and timestamps from strings") + quotedValuesAreStrings := flag.Bool("quoted_values_are_strings", false, "Treat quoted bool, float and integer values as strings") + withTypeConversion := flag.Bool("type_conversion", false, "upgrade field types if data changes") + inputFile := flag.String("in", "t.json", "input file") + outputFile := flag.String("out", "screens.parquet", "output file") + dryRun := flag.Bool("n", false, "only print the schema") + lines := flag.Int64("lines", 0, "number of lines from which to infer schema; 0 means whole file is scanned") + flag.Parse() + if *inputFile == "" { + log.Fatal("no input file specified") + } + log.Println("detecting schema") + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + defer f.Close() + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) + } + defer pprof.StopCPUProfile() + defer log.Printf("program ended\nto view profile run 'go tool pprof -http localhost:8080 %s\n", *cpuprofile) + } + var opts []bodkin.Option + if *inferMode { + opts = append(opts, bodkin.WithInferTimeUnits()) + } + if *withTypeConversion { + opts = append(opts, bodkin.WithTypeConversion()) + } + if *quotedValuesAreStrings { + opts = append(opts, bodkin.WithQuotedValuesAreStrings()) + } + if *lines != 0 { + opts = append(opts, bodkin.WithMaxCount(*lines)) + } + arrowSchema, n, err := j2p.SchemaFromFile(*inputFile, opts...) + if err == bodkin.ErrInvalidInput { + fmt.Printf("schema creation error %v\n", err) + } + if arrowSchema == nil { + log.Fatal("nil schema") + } + log.Printf("schema from %d records\n", n) + fmt.Println(arrowSchema.String()) + if !*dryRun { + if *outputFile == "" { + log.Fatal("no output file specified") + } + log.Println("starting conversion to parquet") + + n, err = j2p.RecordsFromFile(*inputFile, *outputFile, arrowSchema, nil) + log.Printf("%d records written", n) + if err != nil { + log.Printf("parquet error: %v", err) + } + } +} diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go index 1c00c71..b94976b 100644 --- a/json2parquet/json2parquet.go +++ b/json2parquet/json2parquet.go @@ -39,7 +39,7 @@ func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int64, error return schema, u.Count(), err } -func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int, error) { +func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int64, error) { f, err := os.Open(inputFile) if err != nil { return nil, 0, err