forked from Reactive-Extensions/RxJS
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0e10e23
commit 3e4f09d
Showing
9 changed files
with
648 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,5 +11,6 @@ | |
"boss": true, | ||
"eqnull": true, | ||
"node": true, | ||
"-W030": true | ||
"-W030": true, | ||
"predef": [ "Promise" ] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
'use strict'; | ||
|
||
var FlatMapObservable = require('./flatmapobservable'); | ||
var switchLatest = require('./switch'); | ||
|
||
module.exports = function flatMapLatest (source, selector, resultSelector, thisArg) { | ||
return switchLatest(new FlatMapObservable(source, selector, resultSelector, thisArg)); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
'use strict'; | ||
|
||
var ObservableBase = require('./observablebase'); | ||
var AbstractObserver = require('../observer/abstractobserver'); | ||
var BinaryDisposable = require('../binarydisposable'); | ||
var SerialDisposable = require('../serialdisposable'); | ||
var SingleAssignmentDisposable = require('../singleassignmentdisposable'); | ||
var fromPromise = require('./frompromise'); | ||
var isPromise = require('../helpers/ispromise'); | ||
var inherits = require('util').inherits; | ||
|
||
function InnerObserver(p, id) { | ||
this._p = p; | ||
this._id = id; | ||
AbstractObserver.call(this); | ||
} | ||
|
||
inherits(InnerObserver, AbstractObserver); | ||
|
||
InnerObserver.prototype.next = function (x) { this._p._latest === this._id && this._p._o.onNext(x); }; | ||
InnerObserver.prototype.error = function (e) { this._p._latest === this._id && this._p._o.onError(e); }; | ||
InnerObserver.prototype.completed = function () { | ||
if (this._p._latest === this._id) { | ||
this._p._hasLatest = false; | ||
this._p._stopped && this._p._o.onCompleted(); | ||
} | ||
}; | ||
|
||
function SwitchObserver(o, inner) { | ||
this._o = o; | ||
this._inner = inner; | ||
this._stopped = false; | ||
this._latest = 0; | ||
this._hasLatest = false; | ||
AbstractObserver.call(this); | ||
} | ||
|
||
inherits(SwitchObserver, AbstractObserver); | ||
|
||
SwitchObserver.prototype.next = function (innerSource) { | ||
var d = new SingleAssignmentDisposable(), id = ++this._latest; | ||
this._hasLatest = true; | ||
this._inner.setDisposable(d); | ||
isPromise(innerSource) && (innerSource = fromPromise(innerSource)); | ||
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id))); | ||
}; | ||
SwitchObserver.prototype.error = function (e) { this._o.onError(e); }; | ||
SwitchObserver.prototype.completed = function () { this._stopped = true; !this._hasLatest && this._o.onCompleted(); }; | ||
|
||
function SwitchObservable(source) { | ||
this.source = source; | ||
ObservableBase.call(this); | ||
} | ||
|
||
inherits(SwitchObservable, ObservableBase); | ||
|
||
SwitchObservable.prototype.subscribeCore = function (o) { | ||
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner)); | ||
return new BinaryDisposable(s, inner); | ||
}; | ||
|
||
/** | ||
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. | ||
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. | ||
*/ | ||
module.exports = function switch_(source) { | ||
return new SwitchObservable(source); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
'use strict'; | ||
|
||
var ObservableBase = require('./observablebase'); | ||
var AbstractObserver = require('../observer/abstractobserver'); | ||
var CompositeDisposable = require('../compositedisposable'); | ||
var SingleAssignmentDisposable = require('../singleassignmentdisposable'); | ||
var fromPromise = require('./frompromise'); | ||
var isPromise = require('../helpers/ispromise'); | ||
var inherits = require('util').inherits; | ||
|
||
function InnerObserver(state, inner) { | ||
this._s = state; | ||
this._i = inner; | ||
AbstractObserver.call(this); | ||
} | ||
|
||
inherits(InnerObserver, AbstractObserver); | ||
|
||
InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); }; | ||
InnerObserver.prototype.error = function (e) { this._s.o.onError(e); }; | ||
InnerObserver.prototype.completed = function () { | ||
this._s.g.remove(this._i); | ||
this._s.hasCurrent = false; | ||
this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted(); | ||
}; | ||
|
||
function SwitchFirstObserver(state) { | ||
this._s = state; | ||
AbstractObserver.call(this); | ||
} | ||
|
||
inherits(SwitchFirstObserver, AbstractObserver); | ||
|
||
SwitchFirstObserver.prototype.next = function (x) { | ||
if (!this._s.hasCurrent) { | ||
this._s.hasCurrent = true; | ||
isPromise(x) && (x = fromPromise(x)); | ||
var inner = new SingleAssignmentDisposable(); | ||
this._s.g.add(inner); | ||
inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner))); | ||
} | ||
}; | ||
|
||
SwitchFirstObserver.prototype.error = function (e) { | ||
this._s.o.onError(e); | ||
}; | ||
|
||
SwitchFirstObserver.prototype.completed = function () { | ||
this._s.isStopped = true; | ||
!this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted(); | ||
}; | ||
|
||
function SwitchFirstObservable(source) { | ||
this.source = source; | ||
ObservableBase.call(this); | ||
} | ||
|
||
inherits(SwitchFirstObservable, ObservableBase); | ||
|
||
SwitchFirstObservable.prototype.subscribeCore = function (o) { | ||
var m = new SingleAssignmentDisposable(), | ||
g = new CompositeDisposable(), | ||
state = { | ||
hasCurrent: false, | ||
isStopped: false, | ||
o: o, | ||
g: g | ||
}; | ||
|
||
g.add(m); | ||
m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state))); | ||
return g; | ||
}; | ||
|
||
/** | ||
* Performs a exclusive waiting for the first to finish before subscribing to another observable. | ||
* Observables that come in between subscriptions will be dropped on the floor. | ||
* @returns {Observable} A exclusive observable with only the results that happen when subscribed. | ||
*/ | ||
module.exports = function switchFirst (source) { | ||
return new SwitchFirstObservable(source); | ||
}; |
Oops, something went wrong.