From 9c55d3b4cd47231223488c97e221ff81d0a58c8d Mon Sep 17 00:00:00 2001 From: Chet Corcos Date: Sun, 29 Mar 2015 13:41:16 -0700 Subject: [PATCH 1/3] takeUntil but test isnt working --- lib/index.js | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++ test/test.js | 14 +++++++++++++ 2 files changed, 69 insertions(+) diff --git a/lib/index.js b/lib/index.js index 28328ce..48e716f 100755 --- a/lib/index.js +++ b/lib/index.js @@ -3554,6 +3554,61 @@ Stream.prototype.debounce = function (ms) { }; exposeMethod('debounce'); + +/** + * Ends a source stream when a given stream ends or emits a value. + * + * @id takeUntil + * @section Higher-order Streams + * @name Stream.takeUntil(stream) + * @param {Stream} stream - another stream which stops the source + * @api public + * + * a = _() + * b = _() + * a.takeUntil(b) + * a.each(_.log) + * a.write(1) + * // => 1 + * a.write(2) + * // => 2 + * b.write(1) + * // nothing + * a.write(3) + * // nothing + */ + +Stream.prototype.takeUntil = function (stream) { + var first = true, + done = false; + + return this.consume(function (err, x, push, next) { + if (first) { + stream.pull(function() { + if (!done) { + push(null, _.nil); + done = true; + } + }); + first = false; + } + if (err) { + push(err); + next(); + } + else if (x === _.nil) { + done = true; + push(null, x); + } + else if (!done) { + push(null, x); + next(); + } + }); +}; +exposeMethod('takeUntil'); + + /** * Creates a new Stream, which when read from, only returns the last * seen value from the source. The source stream does not experience diff --git a/test/test.js b/test/test.js index c48bdf7..cb081eb 100755 --- a/test/test.js +++ b/test/test.js @@ -1545,6 +1545,20 @@ exports['wrap EventEmitter (or jQuery) on handler with args wrapping by array'] }); }; +exports['takeUntil'] = function (test) { + a = _(); + b = _(); + c = a.takeUntil(b); + a.write(1); + a.write(2); + b.write(1); + a.write(3); + c.toArray(function(x) { + test.same(x, [1,2]); + }) + test.done(); +}; + exports['sequence'] = function (test) { _.sequence([[1,2], [3], [[4],5]]).toArray(function (xs) { test.same(xs, [1,2,3,[4],5]); From d2041601a92e38c9fa6f15ad22626fba5b0e44bb Mon Sep 17 00:00:00 2001 From: Chet Corcos Date: Sun, 29 Mar 2015 13:44:24 -0700 Subject: [PATCH 2/3] takeUntil test works --- test/test.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test.js b/test/test.js index cb081eb..8debd2b 100755 --- a/test/test.js +++ b/test/test.js @@ -1548,6 +1548,8 @@ exports['wrap EventEmitter (or jQuery) on handler with args wrapping by array'] exports['takeUntil'] = function (test) { a = _(); b = _(); + a.resume(); + b.resume(); c = a.takeUntil(b); a.write(1); a.write(2); From b1b1c5a9478fd545be6f1e7fc822ee2977d7b3ad Mon Sep 17 00:00:00 2001 From: Chet Corcos Date: Sun, 29 Mar 2015 17:16:06 -0700 Subject: [PATCH 3/3] takeUntil better tests --- lib/index.js | 3 ++ test/test.js | 138 +++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 126 insertions(+), 15 deletions(-) diff --git a/lib/index.js b/lib/index.js index 48e716f..281b974 100755 --- a/lib/index.js +++ b/lib/index.js @@ -3579,6 +3579,9 @@ exposeMethod('debounce'); */ Stream.prototype.takeUntil = function (stream) { + if (!_.isStream(stream)) { + throw new Error('Invalid stream to takeUntil:', stream); + } var first = true, done = false; diff --git a/test/test.js b/test/test.js index 8debd2b..19cfe4a 100755 --- a/test/test.js +++ b/test/test.js @@ -1545,21 +1545,129 @@ exports['wrap EventEmitter (or jQuery) on handler with args wrapping by array'] }); }; -exports['takeUntil'] = function (test) { - a = _(); - b = _(); - a.resume(); - b.resume(); - c = a.takeUntil(b); - a.write(1); - a.write(2); - b.write(1); - a.write(3); - c.toArray(function(x) { - test.same(x, [1,2]); - }) - test.done(); -}; + +exports['takeUntil'] = { + setUp: function (callback) { + this.clock = sinon.useFakeTimers(); + callback(); + }, + tearDown: function (callback) { + this.clock.restore(); + callback(); + }, + 'invalid stream': function (test) { + test.throws(function () { + _([1,2,3]).takeUntil(10); + }); + test.done(); + }, + 'async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + source.takeUntil(stopStream).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'toplevel - async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + _.takeUntil(stopStream, source).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'toplevel - partial application, async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + _.takeUntil(stopStream)(source).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'noValueOnError': noValueOnErrorTest(_.takeUntil(_())) +}; exports['sequence'] = function (test) { _.sequence([[1,2], [3], [[4],5]]).toArray(function (xs) {