Skip to content

Commit

Permalink
stream: simplify Transform
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 7, 2019
1 parent 065a6b2 commit b370028
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 55 deletions.
1 change: 0 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}

function addChunk(stream, state, chunk, addToFront) {
console.log('addChunk', state.flowing, state.length, state.sync)
if (state.flowing && state.length === 0 && !state.sync) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
Expand Down
49 changes: 31 additions & 18 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ const { Object } = primordials;
module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_MULTIPLE_CALLBACK
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
Object.setPrototypeOf(Transform.prototype, Duplex.prototype);
Expand All @@ -90,35 +88,50 @@ function Transform(options) {
this._flush = options.flush;
}

const final = this._final || (cb => cb());
const flush = this._flush || (cb => cb());
this._readableState.sync = false;
this._resume = null;
};

this._final = function (cb) {
final(() => process.nextTick(flush, () => {
cb();
this.push(null);
}));
Transform.prototype._final = function (cb) {
if (this._flush) {
this._flush((err) => {
if (err) {
cb(err);
} else {
this.push(null);
cb();
}
})
} else {
this.push(null);
cb();
}

this._readableState.sync = false;
};

Transform.prototype._read = function (n) {
if (this._resume) {
this._resume();
this._resume = null;
}
}
};

Transform.prototype._write = function (chunk, encoding, callback) {
this._transform.call(this, chunk, encoding, (...args) => {
if (args[0]) {
callback(args[0]);
let called = false;
this._transform.call(this, chunk, encoding, (err, val) => {
if (err) {
callback(err);
return;
}

if (args.length > 1) {
this.push(args[1]);
if (called) {
callback(new ERR_MULTIPLE_CALLBACK());
return;
} else {
called = true;
}

if (val !== undefined) {
this.push(val);
}

const r = this._readableState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@ const _transform = common.mustCall((chunk, _, next) => {
next();
});

const _final = common.mustCall((next) => {
next();
});

const _flush = common.mustCall((next) => {
next();
});

const t2 = new Transform({
transform: _transform,
flush: _flush,
final: _final
flush: _flush
});

strictEqual(t2._transform, _transform);
Expand Down
19 changes: 5 additions & 14 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,36 +68,27 @@ const t = new stream.Transform({
assert.strictEqual(++state, chunk + 2);
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
// finalCallback part 1
assert.strictEqual(state, 10);
state++;
// finalCallback part 2
assert.strictEqual(state, 11);
done();
}, 1),
flush: common.mustCall(function(done) {
state++;
// fluchCallback part 1
assert.strictEqual(state, 12);
assert.strictEqual(state, 10);
process.nextTick(function() {
state++;
// fluchCallback part 2
assert.strictEqual(state, 13);
assert.strictEqual(state, 11);
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 14);
assert.strictEqual(state, 12);
}, 1));
t.on('end', common.mustCall(function() {
state++;
// endEvent
assert.strictEqual(state, 16);
assert.strictEqual(state, 14);
}, 1));
t.on('data', common.mustCall(function(d) {
// dataListener
Expand All @@ -109,5 +100,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 13);
}, 1));
21 changes: 5 additions & 16 deletions test/parallel/test-stream-transform-final.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,16 @@ const t = new stream.Transform({
assert.strictEqual(++state, chunk + 2);
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
// finalCallback part 1
assert.strictEqual(state, 10);
setTimeout(function() {
state++;
// finalCallback part 2
assert.strictEqual(state, 11);
done();
}, 100);
}, 1),
flush: common.mustCall(function(done) {
state++;
// flushCallback part 1
console.log('flushCallback part 1');
assert.strictEqual(state, 12);
assert.strictEqual(state, 10);
process.nextTick(function() {
state++;
// flushCallback part 2
console.log('flushCallback part 2');
assert.strictEqual(state, 13);
assert.strictEqual(state, 11);
done();
});
}, 1)
Expand All @@ -97,12 +86,12 @@ t.on('finish', common.mustCall(function() {
state++;
// finishListener
console.log('finishListener');
assert.strictEqual(state, 14);
assert.strictEqual(state, 12);
}, 1));
t.on('end', common.mustCall(function() {
state++;
// end event
assert.strictEqual(state, 16);
assert.strictEqual(state, 14);
}, 1));
t.on('data', common.mustCall(function(d) {
// dataListener
Expand All @@ -114,5 +103,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 13);
}, 1));

0 comments on commit b370028

Please sign in to comment.