diff --git a/parallel/processor.go b/parallel/processor.go index 2511763a..daa842b5 100644 --- a/parallel/processor.go +++ b/parallel/processor.go @@ -116,13 +116,11 @@ func (p *Processor) RunWorkers(numWorkers int) error { // Run starts the workers, crunching through the input. func (p *Processor) Run() error { - // wErr signals a worker or writer error. If an error occurs, the items in // the queue are still process, just no items are added to the queue. There // is only one way to toggle this, from false to true, so we don't care // about synchronisation. var wErr error - // The worker fetches items from a queue, executes f and sends the result to the out channel. worker := func(queue chan []Record, out chan []byte, f TransformerFunc, wg *sync.WaitGroup) { defer wg.Done() @@ -136,7 +134,6 @@ func (p *Processor) Run() error { } } } - // The writer collects and buffers writes. writer := func(w io.Writer, bc chan []byte, done chan bool) { bw := bufio.NewWriter(w) @@ -150,25 +147,23 @@ func (p *Processor) Run() error { } done <- true } - - queue := make(chan []Record) - out := make(chan []byte) - done := make(chan bool) - - var wg sync.WaitGroup - + var ( + queue = make(chan []Record) + out = make(chan []byte) + done = make(chan bool) + wg sync.WaitGroup + ) go writer(p.w, out, done) - for i := 0; i < p.NumWorkers; i++ { wg.Add(1) go worker(queue, out, p.f, &wg) } - - batch := NewBytesBatchCapacity(p.BatchSize) - br := bufio.NewReader(p.r) - var i int64 - var batchBytes int64 - + var ( + batch = NewBytesBatchCapacity(p.BatchSize) + br = bufio.NewReader(p.r) + i int64 + batchBytes int64 + ) for { b, err := br.ReadBytes(p.RecordSeparator) if err == io.EOF { @@ -196,14 +191,11 @@ func (p *Processor) Run() error { } i++ } - queue <- batch.Slice() batch.Reset() - close(queue) wg.Wait() close(out) <-done - return wErr }