Skip to content

Commit

Permalink
change schema serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Nov 6, 2024
1 parent 4b515ac commit 5b9d000
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 87 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ go.work.sum

avro/
map.go
*.schema
*.schema
*.pgo
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Bodkin enables you to use your _data_ to define and evolve your Arrow Schema.
- Automatically evolves the Arrow schema with new fields when providing new inputs
- Converts schema field types when unifying schemas to accept evolving input data
- Tracks changes to the schema
- Export/import a schema to/from a stub parquet file to persist schema definition
- Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition

## 🚀 Install

Expand Down Expand Up @@ -150,11 +150,15 @@ for rdr.Next() {
// ]
```

Export your schema to a file, then import the file to retrieve the schema
Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte.
```go
_ = u.ExportSchema("./test.schema")
imp, _ := u.ImportSchema("./test.schema")
fmt.Printf("imported %v\n", imp.String())
_ = u.ExportSchemaFile("./test.schema")
imp, _ := u.ImportSchemaFile("./test.schema")
fmt.Printf("imported %v\n", imp.String())

bs, _ := u.ExportSchemaBytes()
sc, _ := u.ImportSchemaBytes(bs)
fmt.Printf("imported %v\n", sc.String())
```

## 💫 Show your support
Expand Down
94 changes: 20 additions & 74 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ import (
"strings"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/go-viper/mapstructure/v2"
json "github.com/goccy/go-json"
"github.com/loicalleyne/bodkin/pq"
omap "github.com/wk8/go-ordered-map/v2"
)

Expand Down Expand Up @@ -205,90 +200,41 @@ func (u *Bodkin) Paths() []Field {
return paths
}

// ExportSchema exports an Arrow Schema by writing a record full of null values to an Arrow IPC file.
func (u *Bodkin) ExportSchema(exportPath string) error {
f, err := os.Open(exportPath)
if err != nil {
return err
}
defer f.Close()

// ExportSchema exports a serialized Arrow Schema to a file.
func (u *Bodkin) ExportSchemaFile(exportPath string) error {
schema, err := u.Schema()
if err != nil {
return err
}
var prp *parquet.WriterProperties = parquet.NewWriterProperties(
parquet.WithDictionaryDefault(true),
parquet.WithVersion(parquet.V2_LATEST),
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithStats(true),
parquet.WithRootName("bodkin"),
)

pw, _, err := pq.NewParquetWriter(schema, prp, exportPath)
bs := flight.SerializeSchema(schema, memory.DefaultAllocator)
err = os.WriteFile("./temp.schema", bs, 0644)
if err != nil {
return err
}
defer pw.Close()

rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
m := make(map[string]any)
for _, c := range u.old.children {
switch c.arrowType {
case arrow.STRING:
if c.field.Type.ID() != arrow.LIST {
m[c.name] = "nevergonnagiveyouup"
}
case arrow.FLOAT64:
m[c.name] = 1.2345
case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
m[c.name] = 1234
}
}
data, err := json.Marshal(m)
if err != nil {
return err
}
// array.UnmarshalJSON for record builder will read in a single object and add the values to each field in the recordbuilder,
// missing fields will get a null and unexpected keys will be ignored. If reading in an array of records as a single batch,
// then use a structbuilder and use RecordFromStruct. This is fine as we are only interested in the Arrow schema.
for i := 0; i < 10; i++ {
err = rb.UnmarshalJSON(data)
if err != nil {
return err
}
}

rec := rb.NewRecord()
err = pw.WriteRecord(rec)
if err != nil {
return err
}

return f.Sync()
return nil
}

// ImportSchema imports an Arrow Schema from an Arrow IPC file.
func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) {
f, err := os.Open(importPath)
if err != nil {
return nil, err
}
defer f.Close()
pqr, err := file.OpenParquetFile(importPath, true)
// ImportSchema imports a serialized Arrow Schema from a file.
func (u *Bodkin) ImportSchemaFile(importPath string) (*arrow.Schema, error) {
dat, err := os.ReadFile(importPath)
if err != nil {
return nil, err
}
return flight.DeserializeSchema(dat, memory.DefaultAllocator)
}

fr, err := pqarrow.NewFileReader(pqr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
// ExportSchemaBytes exports a serialized Arrow Schema.
func (u *Bodkin) ExportSchemaBytes() ([]byte, error) {
schema, err := u.Schema()
if err != nil {
return nil, err
}
schema, err := fr.Schema()
if schema == nil {
return nil, fmt.Errorf("could not import schema from %s : %v", importPath, err)
}
return schema, nil
return flight.SerializeSchema(schema, memory.DefaultAllocator), nil
}

// ImportSchemaBytes imports a serialized Arrow Schema.
func (u *Bodkin) ImportSchemaBytes(dat []byte) (*arrow.Schema, error) {
return flight.DeserializeSchema(dat, memory.DefaultAllocator)
}

// Unify merges structured input's column definition with the previously input's schema.
Expand Down
16 changes: 13 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"
"strings"

"github.com/apache/arrow-go/v18/arrow/array"
Expand Down Expand Up @@ -169,18 +170,27 @@ func main() {
fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath)
}
fmt.Println(u.Changes())
err = u.ExportSchema("./test.schema")
bs, err := u.ExportSchemaBytes()
if err != nil {
fmt.Println(err)
} else {
imp, err := u.ImportSchema("./test.schema")
imp, err := u.ImportSchemaBytes(bs)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("imported %v\n", imp.String())
}
}

err = u.ExportSchemaFile("./temp.schema")
if err != nil {
log.Fatal(err)
}
sb, err := u.ImportSchemaFile("./temp.schema")
if err != nil {
fmt.Println(err)
} else {
fmt.Println("deserialized:\n", sb.String())
}
}

var jsonS1 string = `{
Expand Down
5 changes: 2 additions & 3 deletions json2parquet/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
cmd/*
cmd/*.json
cmd/*.parquet
*.json
*.parquet
2 changes: 2 additions & 0 deletions json2parquet/cmd/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.json
*.parquet
115 changes: 115 additions & 0 deletions json2parquet/cmd/cleaner/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"bufio"
"flag"
"fmt"
"log"
"os"
"path/filepath"

"github.com/goccy/go-json"

"github.com/redpanda-data/benthos/v4/public/bloblang"
)

// jcleaner takes as input a JSONL file, and removes all null fields, empty arrays,
// empty objects and empty strings.
func main() {
inputFile := flag.String("in", "", "input file")
outputFile := flag.String("out", "", "output file")
flag.Parse()
if *inputFile == "" {
log.Fatal("no input file specified")
}
if *outputFile == "" {
log.Fatal("no output file specified")
}
problemLines := fileNameWithoutExt(*outputFile) + "_problem.json"
f, err := os.Open(*inputFile)
if err != nil {
panic(err)
}
defer func() {
if r := recover(); r != nil {
fmt.Println(err)
}
}()
defer f.Close()
bloblangMapping := `map remove_null_empty {
root = match {
(this.type() == "object" && this.length() == 0) => deleted()
this.type() == "object" => this.map_each(i -> i.value.apply("remove_null_empty"))
(this.type() == "array" && this.length() == 0) => deleted()
this.type() == "array" => this.map_each(v -> v.apply("remove_null_empty"))
this.type() == "null" => deleted()
this.type() == "string" && this.length() == 0 => deleted()
}
}
root = this.apply("remove_null_empty")`
exe, err := bloblang.Parse(bloblangMapping)
if err != nil {
log.Println(err)
}

nf, err := os.Create(*outputFile)
if err != nil {
panic(err)
}
defer nf.Close()
w := bufio.NewWriterSize(nf, 1024*4)

pf, err := os.Create(problemLines)
if err != nil {
panic(err)
}
defer pf.Close()
pw := bufio.NewWriterSize(nf, 1024*4)

r := bufio.NewReaderSize(f, 1024*4)
s := bufio.NewScanner(r)
newline := []byte("\n")
for s.Scan() {
y := s.Bytes()
b, err := ApplyBloblangMapping(y, exe)
if err != nil {
pw.Write(y)
pw.Write(newline)
continue
}
_, err = w.Write(b)
if err != nil {
pw.Write(y)
pw.Write(newline)
continue
}
w.Write(newline)
}
w.Flush()
}

func ApplyBloblangMapping(jsonInput []byte, exe *bloblang.Executor) ([]byte, error) {
// Parse the JSON input into a map[string]interface{}
var inputMap map[string]interface{}
if err := json.Unmarshal(jsonInput, &inputMap); err != nil {
return nil, err
}

// Execute the Bloblang mapping
res, err := exe.Query(inputMap)
if err != nil {
return nil, err
}

// Convert the result back into a JSON string
jsonResult, err := json.Marshal(res)
if err != nil {
return nil, err
}

return jsonResult, nil
}

func fileNameWithoutExt(fileName string) string {
return fileName[:len(fileName)-len(filepath.Ext(fileName))]
}
Loading

0 comments on commit 5b9d000

Please sign in to comment.