Skip to content

Commit

Permalink
NextBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
loic-alleyne committed Nov 14, 2024
1 parent f35fec3 commit e61a9bc
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 10 deletions.
25 changes: 18 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,25 @@ func main() {
log.Printf("elapsed: %v\n", time.Since(start))

i := 0
for r.Next() {
rec := r.Record()
_, err := rec.MarshalJSON()
if err != nil {
fmt.Printf("error marshaling record: %v\n", err)
// for r.Next() {
// rec := r.Record()
// _, err := rec.MarshalJSON()
// if err != nil {
// fmt.Printf("error marshaling record: %v\n", err)
// }
// // fmt.Printf("\nmarshaled record :\n%v\n", string(rj))
// i++
// }
for r.NextBatch(1024) {
recs := r.RecordBatch()
for _, rec := range recs {
_, err := rec.MarshalJSON()
if err != nil {
fmt.Printf("error marshaling record: %v\n", err)
}
// fmt.Printf("\nmarshaled record :\n%v\n", string(rj))
i++
}
// fmt.Printf("\nmarshaled record :\n%v\n", string(rj))
i++
}
log.Println("records", r.Count(), i)
}
Expand Down
58 changes: 55 additions & 3 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type DataReader struct {
bldMap *fieldPos
ldr *dataLoader
cur arrow.Record
curBatch []arrow.Record
readerCtx context.Context
readCancel func()
err error
Expand Down Expand Up @@ -111,6 +112,8 @@ func NewReader(schema *arrow.Schema, source DataSource, opts ...Option) (*DataRe
return r, nil
}

// ReadToRecord decodes a datum directly to an arrow.Record. The record
// should be released by the user when done with it.
func (r *DataReader) ReadToRecord(a any) (arrow.Record, error) {
var err error
defer func() {
Expand Down Expand Up @@ -147,6 +150,52 @@ func (r *DataReader) ReadToRecord(a any) (arrow.Record, error) {
return r.bld.NewRecord(), nil
}

// NextBatch returns whether a []arrow.Record of a specified size can be received
// from the converted record queue. Will still return true if the queue channel is closed and
// last batch of records available < batch size specified.
// The user should check Err() after a call to NextBatch that returned false to check
// if an error took place.
func (r *DataReader) NextBatch(batchSize int) bool {
if batchSize < 1 {
batchSize = 1
}
if len(r.curBatch) != 0 {
for _, rec := range r.curBatch {
rec.Release()
}
r.curBatch = []arrow.Record{}
}
r.wg.Wait()

for len(r.curBatch) <= batchSize {
select {
case rec, ok := <-r.recChan:
if !ok && rec == nil {
if len(r.curBatch) > 0 {
goto jump
}
return false
}
if rec != nil {
r.curBatch = append(r.curBatch, rec)
}
case <-r.bldDone:
if len(r.recChan) > 0 {
r.cur = <-r.recChan
}
case <-r.readerCtx.Done():
return false
}
}

jump:
if r.err != nil {
return false
}

return len(r.curBatch) > 0
}

// Next returns whether a Record can be received from the converted record queue.
// The user should check Err() after a call to Next that returned false to check
// if an error took place.
Expand All @@ -156,7 +205,6 @@ func (r *DataReader) Next() bool {
r.cur.Release()
r.cur = nil
}

r.wg.Wait()
select {
case r.cur, ok = <-r.recChan:
Expand Down Expand Up @@ -195,8 +243,12 @@ func (r *DataReader) Opts() []Option { return r.opts }

// Record returns the current Arrow record.
// It is valid until the next call to Next.
func (r *DataReader) Record() arrow.Record { return r.cur }
func (r *DataReader) Schema() *arrow.Schema { return r.schema }
func (r *DataReader) Record() arrow.Record { return r.cur }

// Record returns the current Arrow record batch.
// It is valid until the next call to NextBatch.
func (r *DataReader) RecordBatch() []arrow.Record { return r.curBatch }
func (r *DataReader) Schema() *arrow.Schema { return r.schema }

// Err returns the last error encountered during the reading of data.
func (r *DataReader) Err() error { return r.err }
Expand Down

0 comments on commit e61a9bc

Please sign in to comment.