bunyan = require 'bunyan'
{isEmpty, last, pick} = require 'ramda'
through2 = require 'through2'
get_initial_state = require './parser/get_initial_state'
parse_file_data = require './parser/main'
log = bunyan.createLogger
name: 'transform_stream'
serializers:
state: pick([
'buffer'
'group_number'
'group_stop_byte'
'record_number'
'record_stop_byte'
'stop_byte'
])
module.exports = ->
state = get_initial_state()
Note that
through2.obj(fn)
is a convenience wrapper aroundthrough2({objectMode: true}, fn)
.
through2.obj (chunk, encoding, done)->
if not Buffer.isBuffer(chunk)
log.error('Chunk is not a Buffer.')
else
state.buffer = Buffer.concat([state.buffer, chunk])
results = parse_file_data(state)
if not isEmpty(results)
state = last(results)
Loop over the results returned and push each out to the Stream consumer.
results.forEach (item)=>
@push item
done()