diff --git a/bodkin.go b/bodkin.go index c1de3ec..45ca3bc 100644 --- a/bodkin.go +++ b/bodkin.go @@ -7,6 +7,7 @@ import ( "bytes" "errors" "fmt" + "math" "os" "slices" "strings" @@ -52,7 +53,8 @@ type Bodkin struct { new *fieldPos knownFields *omap.OrderedMap[string, *fieldPos] untypedFields *omap.OrderedMap[string, *fieldPos] - unificationCount int + unificationCount int64 + maxCount int64 inferTimeUnits bool quotedValuesAreStrings bool typeConversion bool @@ -90,7 +92,7 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) { f := newFieldPos(b) mapToArrow(f, m) b.old = f - + b.maxCount = int64(math.MaxInt64) return b, err } @@ -165,7 +167,24 @@ func (u *Bodkin) Err() []Field { func (u *Bodkin) Changes() error { return u.changes } // Count returns the number of datum evaluated for schema to date. -func (u *Bodkin) Count() int { return u.unificationCount } +func (u *Bodkin) Count() int64 { return u.unificationCount } + +// MaxCount returns the maximum number of datum to be evaluated for schema. +func (u *Bodkin) MaxCount() int64 { return u.unificationCount } + +// ResetCount resets the count of datum evaluated for schema to date. +func (u *Bodkin) ResetCount() int64 { + u.unificationCount = 0 + return u.unificationCount +} + +// ResetMaxCount resets the maximum number of datam to be evaluated for schema +// to maxInt64. +// ResetCount resets the count of datum evaluated for schema to date. +func (u *Bodkin) ResetMaxCount() int64 { + u.maxCount = int64(math.MaxInt64) + return u.unificationCount +} // Paths returns a slice of dotpaths of fields successfully evaluated to date. func (u *Bodkin) Paths() []Field { @@ -274,11 +293,14 @@ func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) { // Unify merges structured input's column definition with the previously input's schema. // Any uppopulated fields, empty objects or empty slices in JSON input are skipped. -func (u *Bodkin) Unify(a any) { +func (u *Bodkin) Unify(a any) error { + if u.unificationCount > u.maxCount { + return fmt.Errorf("maxcount exceeded") + } m, err := InputMap(a) if err != nil { u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err) - return + return fmt.Errorf("%v : %v", ErrInvalidInput, err) } f := newFieldPos(u) @@ -288,6 +310,7 @@ func (u *Bodkin) Unify(a any) { u.merge(field, nil) } u.unificationCount++ + return nil } // Unify merges structured input's column definition with the previously input's schema, @@ -295,6 +318,9 @@ func (u *Bodkin) Unify(a any) { // not found. // Any uppopulated fields, empty objects or empty slices in JSON input are skipped. func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error { + if u.unificationCount > u.maxCount { + return fmt.Errorf("maxcount exceeded") + } mergePath := make([]string, 0) if !(len(mergeAt) == 0 || mergeAt == "$") { mergePath = strings.Split(strings.TrimPrefix(mergeAt, "$"), ".") diff --git a/cmd/main.go b/cmd/main.go index 5a9b83c..483882e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -179,7 +179,6 @@ func main() { } else { fmt.Printf("imported %v\n", imp.String()) } - } } diff --git a/json2parquet/json2parquet.go b/json2parquet/json2parquet.go index d5d4394..1c00c71 100644 --- a/json2parquet/json2parquet.go +++ b/json2parquet/json2parquet.go @@ -14,7 +14,7 @@ import ( "github.com/loicalleyne/bodkin/pq" ) -func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) { +func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int64, error) { var err error s := bufio.NewScanner(r) var u *bodkin.Bodkin @@ -28,7 +28,7 @@ func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) } for s.Scan() { u.Unify(s.Bytes()) - if u.Count() > 10000 { + if u.Count() > u.MaxCount() { break } } diff --git a/option.go b/option.go index f252c95..e4137e5 100644 --- a/option.go +++ b/option.go @@ -32,3 +32,10 @@ func WithQuotedValuesAreStrings() Option { cfg.quotedValuesAreStrings = true } } + +// WithMaxCount enables capping the number of Unify evaluations. +func WithMaxCount(i int64) Option { + return func(cfg config) { + cfg.maxCount = i + } +}