Skip to content

Commit

Permalink
export/import schema to file
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Nov 5, 2024
1 parent 8f2a6e7 commit fb3efc3
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ go.work.sum

avro/
map.go
*.schema
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The goal is to provide a useful toolkit to make it easier to use Arrow, and by e
- Automatically evolves the Arrow schema with new fields when providing new inputs
- Converts schema field types when unifying schemas to accept evolving input data
- Tracks changes to the schema
- Export/import a schema to/from a file to persist schema definition

## 🚀 Install

Expand Down
98 changes: 98 additions & 0 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@ import (
"bytes"
"errors"
"fmt"
"os"
"slices"
"strings"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/go-viper/mapstructure/v2"
json "github.com/goccy/go-json"
"github.com/loicalleyne/bodkin/pq"
omap "github.com/wk8/go-ordered-map/v2"
)

Expand Down Expand Up @@ -178,6 +186,96 @@ func (u *Bodkin) Paths() []Field {
return paths
}

// ExportSchema exports an Arrow Schema by writing a record full of null values to an Arrow IPC file.
func (u *Bodkin) ExportSchema(exportPath string) error {
f, err := os.Open(exportPath)
if err != nil {
return err
}
defer f.Close()

schema, err := u.Schema()
if err != nil {
return err
}
var prp *parquet.WriterProperties = parquet.NewWriterProperties(
parquet.WithDictionaryDefault(true),
parquet.WithVersion(parquet.V2_LATEST),
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithStats(true),
parquet.WithRootName("bodkin"),
)

pw, _, err := pq.NewParquetWriter(schema, prp, exportPath)
if err != nil {
return err
}
defer pw.Close()

rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
// json data unlikely to conflict with a real detected schema
// m, _ := InputMap([]byte(`{"nevergonnagiveyouup":"nevergonnaletyoudown"}`))
m := make(map[string]any)
for _, c := range u.old.children {
switch c.arrowType {
case arrow.STRING:
if c.field.Type.ID() != arrow.LIST {
m[c.name] = "nevergonnagiveyouup"
}
case arrow.FLOAT64:
m[c.name] = 1.2345
case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
m[c.name] = 1234
}
}
data, err := json.Marshal(m)
if err != nil {
return err
}
fmt.Println(string(data))
// array.UnmarshalJSON for record builder will read in a single object and add the values to each field in the recordbuilder,
// missing fields will get a null and unexpected keys will be ignored. If reading in an array of records as a single batch,
// then use a structbuilder and use RecordFromStruct. This is fine as we are only interested in the Arrow schema.
for i := 0; i < 10; i++ {
err = rb.UnmarshalJSON(data)
if err != nil {
return err
}
}

rec := rb.NewRecord()
err = pw.WriteRecord(rec)
if err != nil {
return err
}

f.Sync()
return nil
}

// ImportSchema imports an Arrow Schema from an Arrow IPC file.
func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) {
f, err := os.Open(importPath)
if err != nil {
return nil, err
}
defer f.Close()
pqr, err := file.OpenParquetFile(importPath, true)
if err != nil {
return nil, err
}

fr, err := pqarrow.NewFileReader(pqr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
return nil, err
}
schema, err := fr.Schema()
if schema == nil {
return nil, fmt.Errorf("could not import schema from %s : %v", importPath, err)
}
return schema, nil
}

// Unify merges structured input's column definition with the previously input's schema.
// Any uppopulated fields, empty objects or empty slices in JSON input are skipped.
func (u *Bodkin) Unify(a any) {
Expand Down
12 changes: 12 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ func main() {
fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath)
}
fmt.Println(u.Changes())
err = u.ExportSchema("./test.schema")
if err != nil {
fmt.Println(err)
} else {
imp, err := u.ImportSchema("./test.schema")
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("imported %v\n", imp.String())
}

}

}

Expand Down
5 changes: 3 additions & 2 deletions json2parquet/json2parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/parquet"
"github.com/loicalleyne/bodkin"
"github.com/loicalleyne/bodkin/pq"
)

func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) {
Expand Down Expand Up @@ -62,11 +63,11 @@ func RecordsFromFile(inputFile, outputFile string, schema *arrow.Schema, munger
}
}()
defer f.Close()
var prp *parquet.WriterProperties = defaultWrtp
var prp *parquet.WriterProperties = pq.DefaultWrtp
if len(opts) != 0 {
prp = parquet.NewWriterProperties(opts...)
}
pw, _, err := NewParquetWriter(schema, prp, outputFile)
pw, _, err := pq.NewParquetWriter(schema, prp, outputFile)
if err != nil {
return 0, err
}
Expand Down
10 changes: 5 additions & 5 deletions json2parquet/parquet_writer.go → pq/parquet_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package json2parquet
package pq

import (
"fmt"
Expand All @@ -18,12 +18,12 @@ const (
)

var (
defaultWrtp = parquet.NewWriterProperties(
DefaultWrtp = parquet.NewWriterProperties(
parquet.WithDictionaryDefault(true),
parquet.WithVersion(parquet.V2_LATEST),
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithStats(true),
parquet.WithRootName("json2parquet"),
parquet.WithRootName("bodkin"),
)
)

Expand Down Expand Up @@ -63,8 +63,8 @@ func NewParquetWriter(sc *arrow.Schema, wrtp *parquet.WriterProperties, path str
if err != nil {
return nil, nil, fmt.Errorf("failed to create destination file: %w", err)
}

pqwrt, err := pqarrow.NewFileWriter(sc, destFile, wrtp, pqarrow.DefaultWriterProps())
artp := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
pqwrt, err := pqarrow.NewFileWriter(sc, destFile, wrtp, artp)
if err != nil {
return nil, nil, fmt.Errorf("failed to create parquet writer: %w", err)
}
Expand Down

0 comments on commit fb3efc3

Please sign in to comment.