Skip to content

Commit

Permalink
add maxcount and reset funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
loicalleyne committed Nov 6, 2024
1 parent e7452fb commit d6107ac
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
36 changes: 31 additions & 5 deletions bodkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -52,7 +53,8 @@ type Bodkin struct {
new *fieldPos
knownFields *omap.OrderedMap[string, *fieldPos]
untypedFields *omap.OrderedMap[string, *fieldPos]
unificationCount int
unificationCount int64
maxCount int64
inferTimeUnits bool
quotedValuesAreStrings bool
typeConversion bool
Expand Down Expand Up @@ -90,7 +92,7 @@ func newBodkin(m map[string]any, opts ...Option) (*Bodkin, error) {
f := newFieldPos(b)
mapToArrow(f, m)
b.old = f

b.maxCount = int64(math.MaxInt64)
return b, err
}

Expand Down Expand Up @@ -165,7 +167,24 @@ func (u *Bodkin) Err() []Field {
func (u *Bodkin) Changes() error { return u.changes }

// Count returns the number of datum evaluated for schema to date.
func (u *Bodkin) Count() int { return u.unificationCount }
func (u *Bodkin) Count() int64 { return u.unificationCount }

// MaxCount returns the maximum number of datum to be evaluated for schema.
func (u *Bodkin) MaxCount() int64 { return u.unificationCount }

// ResetCount resets the count of datum evaluated for schema to date.
func (u *Bodkin) ResetCount() int64 {
u.unificationCount = 0
return u.unificationCount
}

// ResetMaxCount resets the maximum number of datam to be evaluated for schema
// to maxInt64.
// ResetCount resets the count of datum evaluated for schema to date.
func (u *Bodkin) ResetMaxCount() int64 {
u.maxCount = int64(math.MaxInt64)
return u.unificationCount
}

// Paths returns a slice of dotpaths of fields successfully evaluated to date.
func (u *Bodkin) Paths() []Field {
Expand Down Expand Up @@ -274,11 +293,14 @@ func (u *Bodkin) ImportSchema(importPath string) (*arrow.Schema, error) {

// Unify merges structured input's column definition with the previously input's schema.
// Any uppopulated fields, empty objects or empty slices in JSON input are skipped.
func (u *Bodkin) Unify(a any) {
func (u *Bodkin) Unify(a any) error {
if u.unificationCount > u.maxCount {
return fmt.Errorf("maxcount exceeded")
}
m, err := InputMap(a)
if err != nil {
u.err = fmt.Errorf("%v : %v", ErrInvalidInput, err)
return
return fmt.Errorf("%v : %v", ErrInvalidInput, err)
}

f := newFieldPos(u)
Expand All @@ -288,13 +310,17 @@ func (u *Bodkin) Unify(a any) {
u.merge(field, nil)
}
u.unificationCount++
return nil
}

// Unify merges structured input's column definition with the previously input's schema,
// using a specified valid path as the root. An error is returned if the mergeAt path is
// not found.
// Any uppopulated fields, empty objects or empty slices in JSON input are skipped.
func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error {
if u.unificationCount > u.maxCount {
return fmt.Errorf("maxcount exceeded")
}
mergePath := make([]string, 0)
if !(len(mergeAt) == 0 || mergeAt == "$") {
mergePath = strings.Split(strings.TrimPrefix(mergeAt, "$"), ".")
Expand Down
1 change: 0 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func main() {
} else {
fmt.Printf("imported %v\n", imp.String())
}

}

}
Expand Down
4 changes: 2 additions & 2 deletions json2parquet/json2parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/loicalleyne/bodkin/pq"
)

func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error) {
func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int64, error) {
var err error
s := bufio.NewScanner(r)
var u *bodkin.Bodkin
Expand All @@ -28,7 +28,7 @@ func FromReader(r io.Reader, opts ...bodkin.Option) (*arrow.Schema, int, error)
}
for s.Scan() {
u.Unify(s.Bytes())
if u.Count() > 10000 {
if u.Count() > u.MaxCount() {
break
}
}
Expand Down
7 changes: 7 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ func WithQuotedValuesAreStrings() Option {
cfg.quotedValuesAreStrings = true
}
}

// WithMaxCount enables capping the number of Unify evaluations.
func WithMaxCount(i int64) Option {
return func(cfg config) {
cfg.maxCount = i
}
}

0 comments on commit d6107ac

Please sign in to comment.