Skip to content

Commit

Permalink
minor tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
miku committed Dec 22, 2021
1 parent bb57ad3 commit 7583202
Showing 1 changed file with 12 additions and 20 deletions.
32 changes: 12 additions & 20 deletions parallel/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,11 @@ func (p *Processor) RunWorkers(numWorkers int) error {

// Run starts the workers, crunching through the input.
func (p *Processor) Run() error {

// wErr signals a worker or writer error. If an error occurs, the items in
// the queue are still process, just no items are added to the queue. There
// is only one way to toggle this, from false to true, so we don't care
// about synchronisation.
var wErr error

// The worker fetches items from a queue, executes f and sends the result to the out channel.
worker := func(queue chan []Record, out chan []byte, f TransformerFunc, wg *sync.WaitGroup) {
defer wg.Done()
Expand All @@ -136,7 +134,6 @@ func (p *Processor) Run() error {
}
}
}

// The writer collects and buffers writes.
writer := func(w io.Writer, bc chan []byte, done chan bool) {
bw := bufio.NewWriter(w)
Expand All @@ -150,25 +147,23 @@ func (p *Processor) Run() error {
}
done <- true
}

queue := make(chan []Record)
out := make(chan []byte)
done := make(chan bool)

var wg sync.WaitGroup

var (
queue = make(chan []Record)
out = make(chan []byte)
done = make(chan bool)
wg sync.WaitGroup
)
go writer(p.w, out, done)

for i := 0; i < p.NumWorkers; i++ {
wg.Add(1)
go worker(queue, out, p.f, &wg)
}

batch := NewBytesBatchCapacity(p.BatchSize)
br := bufio.NewReader(p.r)
var i int64
var batchBytes int64

var (
batch = NewBytesBatchCapacity(p.BatchSize)
br = bufio.NewReader(p.r)
i int64
batchBytes int64
)
for {
b, err := br.ReadBytes(p.RecordSeparator)
if err == io.EOF {
Expand Down Expand Up @@ -196,14 +191,11 @@ func (p *Processor) Run() error {
}
i++
}

queue <- batch.Slice()
batch.Reset()

close(queue)
wg.Wait()
close(out)
<-done

return wErr
}

0 comments on commit 7583202

Please sign in to comment.