From 2fd37d2acdedae7095ba6ed1ace537ee95dd197e Mon Sep 17 00:00:00 2001 From: loicalleyne Date: Sun, 3 Nov 2024 22:01:31 -0500 Subject: [PATCH] json2parquet --- .gitignore | 3 + bodkin.go | 5 +- go.mod | 30 +++++++ go.sum | 54 ++++++++++++ json2parquet/.gitignore | 3 + json2parquet/json2parquet.go | 117 ++++++++++++++++++++++++++ json2parquet/parquet_writer.go | 147 +++++++++++++++++++++++++++++++++ schema.go | 17 +++- 8 files changed, 372 insertions(+), 4 deletions(-) create mode 100644 json2parquet/.gitignore create mode 100644 json2parquet/json2parquet.go create mode 100644 json2parquet/parquet_writer.go diff --git a/.gitignore b/.gitignore index 6f72f89..cbd132c 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ go.work.sum # env file .env + +avro/ +map.go diff --git a/bodkin.go b/bodkin.go index c37a2e5..4f85edc 100644 --- a/bodkin.go +++ b/bodkin.go @@ -74,7 +74,7 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { return b, errWrap(f) } -// Err returns the last error encountered during the unification of input schemas. +// Err returns the last errors encountered during the unification of input schemas. func (u *Bodkin) Err() error { return u.err } // Changes returns a list of field additions and field type conversions done @@ -157,6 +157,7 @@ func (u *Bodkin) Unify(a any) { // Schema returns the original Arrow schema generated from the structure/types of // the initial input, and wrapped errors indicating which fields could not be evaluated. +// Make sure to check that returned schema != nil. func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { var fields []arrow.Field for _, c := range u.original.children { @@ -168,6 +169,7 @@ func (u *Bodkin) OriginSchema() (*arrow.Schema, error) { // Schema returns the current merged Arrow schema generated from the structure/types of // the input(s), and wrapped errors indicating which fields could not be evaluated. +// Make sure to check that returned schema != nil. func (u *Bodkin) Schema() (*arrow.Schema, error) { var fields []arrow.Field for _, c := range u.old.children { @@ -180,6 +182,7 @@ func (u *Bodkin) Schema() (*arrow.Schema, error) { // LastSchema returns the Arrow schema generated from the structure/types of // the most recent input. Any uppopulated fields, empty objects or empty slices are skipped. // ErrNoLatestSchema if Unify() has never been called. +// Make sure to check that returned schema != nil. func (u *Bodkin) LastSchema() (*arrow.Schema, error) { if u.new == nil { return nil, ErrNoLatestSchema diff --git a/go.mod b/go.mod index ef3e5bd..2fcda43 100644 --- a/go.mod +++ b/go.mod @@ -9,13 +9,43 @@ require ( ) require ( + github.com/Jeffail/gabs/v2 v2.7.0 // indirect + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect + github.com/OneOfOne/xxhash v1.2.8 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect + github.com/apache/thrift v0.21.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/matoous/go-nanoid/v2 v2.1.0 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/redpanda-data/benthos/v4 v4.40.0 // indirect + github.com/segmentio/ksuid v1.0.4 // indirect + github.com/tilinna/z85 v1.0.0 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.35.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 76b015e..9b85c6f 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,35 @@ +github.com/Jeffail/gabs/v2 v2.7.0 h1:Y2edYaTcE8ZpRsR2AtmPu5xQdFDIthFG0jYhu5PY8kg= +github.com/Jeffail/gabs/v2 v2.7.0/go.mod h1:dp5ocw1FvBBQYssgHsG7I1WYsiLRtkUaB1FEtSwvNUw= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= +github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= @@ -22,6 +38,8 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE= +github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= @@ -30,26 +48,62 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redpanda-data/benthos/v4 v4.40.0 h1:/6h8BYALrqzvAKo3RsVdscIdypj08NwfAMgcvX5/+uY= +github.com/redpanda-data/benthos/v4 v4.40.0/go.mod h1:A5izknIGyzs16rCU0qliFVgdCLn2yyvLM4Hltx+s+TI= +github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= +github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw= +github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/json2parquet/.gitignore b/json2parquet/.gitignore new file mode 100644 index 0000000..4721a81 --- /dev/null +++ b/json2parquet/.gitignore @@ -0,0 +1,3 @@ +cmd/* +cmd/*.json +cmd/*.parquet \ No newline at end of file diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go new file mode 100644 index 0000000..5e1e3d8 --- /dev/null +++ b/json2parquet/json2parquet.go @@ -0,0 +1,117 @@ +package json2parquet + +import ( + "bufio" + "errors" + "fmt" + "io" + "os" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/parquet" + "github.com/loicalleyne/bodkin" +) + +func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) { + var err, errBundle error + s := bufio.NewScanner(r) + var u *bodkin.Bodkin + var i int + if s.Scan() { + u, err = bodkin.NewBodkin(s.Bytes(), opts...) + if err != nil { + errBundle = errors.Join(errBundle, err) + } + i++ + } else { + return nil, i, bodkin.ErrInvalidInput + } + for s.Scan() { + u.Unify(s.Bytes()) + i++ + if i > 10000 { + break + } + } + schema, err := u.Schema() + if schema == nil { + if err != nil { + errBundle = errors.Join(errBundle, err) + } + return nil, i, errBundle + } + return schema, i, errBundle +} + +func SchemaFromFile(inputFile string, opts ...bodkin.Option) (*arrow.Schema, int, error) { + f, err := os.Open(inputFile) + if err != nil { + return nil, 0, err + } + defer f.Close() + + r := bufio.NewReaderSize(f, 1024*32) + return FromReader(r, opts...) +} + +func RecordsFromFile(inputFile, outputFile string, schema *arrow.Schema, munger func(io.Reader, io.Writer) error, opts ...parquet.WriterProperty) (int, error) { + n := 0 + f, err := os.Open(inputFile) + if err != nil { + return 0, err + } + defer func() { + if r := recover(); r != nil { + fmt.Println(err) + fmt.Println("Records:", n) + } + }() + defer f.Close() + var prp *parquet.WriterProperties = defaultWrtp + if len(opts) != 0 { + prp = parquet.NewWriterProperties(opts...) + } + pw, _, err := NewParquetWriter(schema, prp, outputFile) + if err != nil { + return 0, err + } + defer pw.Close() + + var r io.Reader + var rdr *array.JSONReader + chunk := 1024 + munger = nil + r = bufio.NewReaderSize(f, 1024*1024*128) + if munger != nil { + pr, pwr := io.Pipe() + + go func() { + // close the writer, so the reader knows there's no more data + defer pwr.Close() + munger(r, pwr) + }() + rdr = array.NewJSONReader(pr, schema, array.WithChunk(chunk)) + } else { + rdr = array.NewJSONReader(r, schema, array.WithChunk(chunk)) + } + + defer rdr.Release() + + for rdr.Next() { + rec := rdr.Record() + err1 := pw.WriteRecord(rec) + if err != nil { + err = errors.Join(err, fmt.Errorf("failed to write parquet record: %v", err1)) + } + n = n + chunk + } + if err := rdr.Err(); err != nil { + return n, err + } + err = pw.Close() + if err != nil { + return n, err + } + return n, err +} diff --git a/json2parquet/parquet_writer.go b/json2parquet/parquet_writer.go new file mode 100644 index 0000000..5ab35a7 --- /dev/null +++ b/json2parquet/parquet_writer.go @@ -0,0 +1,147 @@ +package json2parquet + +import ( + "fmt" + "os" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + "github.com/apache/arrow-go/v18/parquet/schema" +) + +const ( + defaultRowGroupByteLimit = 10 * 1024 * 1024 +) + +var ( + defaultWrtp = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithCompression(compress.Codecs.Zstd), + parquet.WithStats(true), + parquet.WithRootName("json2parquet"), + ) +) + +type ParquetWriter struct { + destFile *os.File + pqwrt *pqarrow.FileWriter + sc *arrow.Schema + count int +} + +// NewParquetWriter creates a new ParquetWriter. +// +// sc is the Arrow schema to use for writing records. +// wrtp are the Parquet writer properties to use. +// +// Returns a ParquetWriter and an error. The error will be non-nil if: +// - Failed to get the Parquet schema from the Arrow schema. +// - Failed to create the destination file. +// - Failed to create the Parquet file writer. +// +// Example: +// ```go +// pw, err := NewParquetWriter(schema, parquet.NewWriterProperties(parquet.WithCompression(parquet.CompressionCodec_SNAPPY))) +// +// if err != nil { +// log.Fatal(err) +// } +// +// ``` +func NewParquetWriter(sc *arrow.Schema, wrtp *parquet.WriterProperties, path string) (*ParquetWriter, *schema.Schema, error) { + pqschema, err := pqarrow.ToParquet(sc, wrtp, pqarrow.DefaultWriterProps()) + if err != nil { + return nil, nil, fmt.Errorf("failed to get parquet schema: %w", err) + } + + destFile, err := os.Create(path) + if err != nil { + return nil, nil, fmt.Errorf("failed to create destination file: %w", err) + } + + pqwrt, err := pqarrow.NewFileWriter(sc, destFile, wrtp, pqarrow.DefaultWriterProps()) + if err != nil { + return nil, nil, fmt.Errorf("failed to create parquet writer: %w", err) + } + + return &ParquetWriter{destFile: destFile, pqwrt: pqwrt, sc: sc}, pqschema, nil +} + +// Write writes a single record to the Parquet file. +// +// jsonData is the JSON encoded record data. +// +// Returns an error if: +// - Failed to unmarshal the JSON data. +// - Failed to write the record to Parquet. +// +// Increments the record count and creates a new row group if the current +// row group exceeds the default row group byte limit. +// +// Example: +// ```go +// err := pw.Write([]byte(`{"id":1,"name":"foo"}`)) +// +// if err != nil { +// log.Fatal(err) +// } +// +// ``` +func (pw *ParquetWriter) Write(jsonData []byte) error { + recbld := array.NewRecordBuilder(memory.DefaultAllocator, pw.sc) + defer recbld.Release() + + err := recbld.UnmarshalJSON(jsonData) + if err != nil { + return fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + rec := recbld.NewRecord() + defer rec.Release() + err = pw.pqwrt.WriteBuffered(rec) + if err != nil { + return fmt.Errorf("failed to write to parquet: %w", err) + } + + if pw.pqwrt.RowGroupTotalBytesWritten() >= defaultRowGroupByteLimit { + pw.pqwrt.NewBufferedRowGroup() + } + pw.count++ + + return nil +} + +func (pw *ParquetWriter) WriteRecord(rec arrow.Record) error { + err := pw.pqwrt.WriteBuffered(rec) + if err != nil { + return fmt.Errorf("failed to write to parquet: %w", err) + } + + if pw.pqwrt.RowGroupTotalBytesWritten() >= defaultRowGroupByteLimit { + pw.pqwrt.NewBufferedRowGroup() + } + pw.count++ + + return nil +} + +// RecordCount returns the total number of records written. +func (pw *ParquetWriter) RecordCount() int { + return pw.count +} + +// Close closes the Parquet writer. +// +// Returns an error if failed to close the Parquet file writer. +func (pw *ParquetWriter) Close() error { + if err := pw.pqwrt.Close(); err != nil { + return fmt.Errorf("failed to close parquet writer: %w", err) + } + + return nil +} diff --git a/schema.go b/schema.go index 109a29c..50cae5c 100644 --- a/schema.go +++ b/schema.go @@ -7,17 +7,26 @@ import ( "slices" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" ) type fieldPos struct { root *fieldPos parent *fieldPos owner *Bodkin + builder array.Builder name string path []string + isList bool + isItem bool + isStruct bool + isMap bool + typeName string field arrow.Field children []*fieldPos childmap map[string]*fieldPos + appendFunc func(val interface{}) error + metadatas arrow.Metadata index, depth int32 err error } @@ -31,6 +40,8 @@ var ( ErrUndefinedArrayElementType = errors.New("could not determine element type of empty array") ErrNotAnUpgradableType = errors.New("is not an upgradable type") ErrPathNotFound = errors.New("path not found") + ErrFieldTypeChanged = errors.New("changed") + ErrFieldAdded = errors.New("added") ) // UpgradableTypes are scalar types that can be upgraded to a more flexible type. @@ -184,7 +195,7 @@ func (f *fieldPos) graft(n *fieldPos) { graft.children = append(graft.children, n.children...) graft.mapChildren() f.assignChild(graft) - f.owner.changes = errors.Join(f.owner.changes, fmt.Errorf("added %v : %v", graft.dotPath(), graft.field.Type.String())) + f.owner.changes = errors.Join(f.owner.changes, fmt.Errorf("%w %v : %v", ErrFieldAdded, graft.dotPath(), graft.field.Type.String())) if f.field.Type.ID() == arrow.STRUCT { gf := f.field.Type.(*arrow.StructType) var nf []arrow.Field @@ -221,7 +232,7 @@ func (o *fieldPos) upgradeType(n *fieldPos, t arrow.Type) error { } o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.StructOf(fields...), Nullable: true} } - o.owner.changes = errors.Join(o.owner.changes, fmt.Errorf("changed %v : from %v to %v", o.dotPath(), oldType, o.field.Type.String())) + o.owner.changes = errors.Join(o.owner.changes, fmt.Errorf("%w %v : from %v to %v", ErrFieldTypeChanged, o.dotPath(), oldType, o.field.Type.String())) return nil } @@ -353,7 +364,7 @@ func goType2Arrow(f *fieldPos, gt any) arrow.DataType { if f.owner.inferTimeUnits { for _, r := range timestampMatchers { if r.MatchString(t) { - return arrow.FixedWidthTypes.Timestamp_ms + return arrow.FixedWidthTypes.Timestamp_us } } if dateMatcher.MatchString(t) {