Skip to content

Commit 1d85a39

Browse files
authored
Feature/subscribe scheduler (ReactiveX#222)
Pass in scheduler using subscribe
1 parent d3195df commit 1d85a39

File tree

203 files changed

+2054
-2041
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

203 files changed

+2054
-2041
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
dist
1010
build
1111
eggs
12+
.eggs
1213
parts
1314
bin
1415
var

.travis.yml

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
language: python
22
python:
3-
- "3.5"
3+
- "3.6"
44
# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors
55
install:
66
- python setup.py install
77
- pip install coveralls
88
- pip install coverage
9-
- pip install nose
9+
- pip install pytest>=3.0.2 pytest-asyncio pytest-cov --upgrade
1010
- pip install tornado
1111
# command to run tests, e.g. python setup.py test
12-
script: coverage run --source=rx setup.py test
12+
script:
13+
- coverage run --source=rx setup.py test
1314
after_success:
14-
coveralls
15+
- coveralls

examples/ironpython/meetdotnet.py

-38
This file was deleted.

pytest.ini

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[pytest]
2+
testpaths = tests

rx/backpressure/controlledobservable.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from rx.core import Observable, Observable
1+
from rx.core import Observable
22
from rx.internal import extensionmethod
33

44
from .controlledsubject import ControlledSubject
@@ -12,8 +12,8 @@ def __init__(self, source, enable_queue, scheduler=None):
1212
self.subject = ControlledSubject(enable_queue, scheduler)
1313
self.source = source.multicast(self.subject).ref_count()
1414

15-
def _subscribe_core(self, observer):
16-
return self.source.subscribe(observer)
15+
def _subscribe_core(self, observer, scheduler=None):
16+
return self.source.subscribe(observer, scheduler)
1717

1818
def request(self, number_of_items):
1919
if number_of_items is None:

rx/backpressure/controlledsubject.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ def __init__(self, enable_queue=True, scheduler=None):
1919
self.has_completed = False
2020
self.scheduler = scheduler or current_thread_scheduler
2121

22-
def _subscribe_core(self, observer):
23-
return self.subject.subscribe(observer)
22+
def _subscribe_core(self, observer, scheduler=None):
23+
return self.subject.subscribe(observer, scheduler)
2424

2525
def close(self):
2626
self.has_completed = True

rx/backpressure/pausable.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, source, pauser=None):
1717

1818
super(PausableObservable, self).__init__()
1919

20-
def _subscribe_core(self, observer):
20+
def _subscribe_core(self, observer, scheduler=None):
2121
conn = self.source.publish()
2222
subscription = conn.subscribe(observer)
2323
connection = [Disposable.empty()]
@@ -29,7 +29,7 @@ def send(b):
2929
connection[0].dispose()
3030
connection[0] = Disposable.empty()
3131

32-
pausable = self.pauser.distinct_until_changed().subscribe_callbacks(send)
32+
pausable = self.pauser.distinct_until_changed().subscribe_callbacks(send, scheduler=scheduler)
3333
return CompositeDisposable(subscription, connection[0], pausable)
3434

3535
def pause(self):

rx/backpressure/pausablebuffered.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66

77
def combine_latest_source(source, subject, result_selector):
8-
def subscribe(observer):
8+
def subscribe(observer, scheduler=None):
99
has_value = [False, False]
1010
has_value_all = [False]
1111
values = [None, None]
@@ -47,8 +47,8 @@ def close_subject():
4747
next(True, 1)
4848

4949
return CompositeDisposable(
50-
source.subscribe_callbacks(lambda x: next(x, 0), throw_source, close_source),
51-
subject.subscribe_callbacks(lambda x: next(x, 1), observer.throw, close_subject)
50+
source.subscribe_callbacks(lambda x: next(x, 0), throw_source, close_source, scheduler),
51+
subject.subscribe_callbacks(lambda x: next(x, 1), observer.throw, close_subject, scheduler)
5252
)
5353
return AnonymousObservable(subscribe)
5454

@@ -66,7 +66,7 @@ def __init__(self, source, pauser=None):
6666

6767
super(PausableBufferedObservable, self).__init__()
6868

69-
def _subscribe_core(self, observer):
69+
def _subscribe_core(self, observer, scheduler=None):
7070
previous_should_fire = [None]
7171
queue = []
7272

@@ -106,7 +106,7 @@ def close():
106106
self.source,
107107
self.pauser.distinct_until_changed().start_with(False),
108108
result_selector
109-
).subscribe_callbacks(send, throw, close)
109+
).subscribe_callbacks(send, throw, close, scheduler)
110110

111111
return subscription
112112

rx/backpressure/stopandwaitobservable.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ def __init__(self, source, scheduler=None):
5353
self.source = source
5454
self.subscription = None
5555

56-
def _subscribe_core(self, observer):
56+
def _subscribe_core(self, observer, scheduler=None):
5757
observer = StopAndWaitObserver(observer, self, self.subscription, self.scheduler)
58-
self.subscription = self.source.subscribe(observer)
58+
self.subscription = self.source.subscribe(observer, scheduler)
5959

6060
def action(scheduler, state=None):
6161
self.source.request(1)

rx/backpressure/windowedobservable.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ def __init__(self, source, window_size, scheduler=None):
5353
self.scheduler = scheduler or current_thread_scheduler
5454
self.subscription = None
5555

56-
def _subscribe_core(self, observer):
56+
def _subscribe_core(self, observer, scheduler=None):
5757
observer = WindowedObserver(observer, self, self.subscription, self.scheduler)
58-
self.subscription = self.source.subscribe(observer)
58+
self.subscription = self.source.subscribe(observer, scheduler)
5959

6060
def action(scheduler, state):
6161
self.source.request(self.window_size)

rx/concurrency/currentthreadscheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(self):
4646
def schedule(self, action, state=None):
4747
"""Schedules an action to be executed."""
4848

49-
log.debug("CurrentThreadScheduler.schedule(state=%s)", state)
49+
#log.debug("CurrentThreadScheduler.schedule(state=%s)", state)
5050
return self.schedule_relative(timedelta(0), action, state)
5151

5252
def schedule_relative(self, duetime, action, state=None):

rx/concurrency/virtualtimescheduler.py

+27-16
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
log = logging.getLogger("Rx")
1010

11+
MAX_SPINNING = 100
1112

1213
class VirtualTimeScheduler(SchedulerBase):
1314
"""Virtual Scheduler. This scheduler should work with either
@@ -70,13 +71,23 @@ def start(self):
7071
return
7172

7273
self.is_enabled = True
74+
75+
spinning = 0
7376
while self.is_enabled:
74-
next = self.get_next()
75-
if not next:
77+
item = self.get_next()
78+
if not item:
7679
break
77-
if next.duetime > self.clock:
78-
self.clock = next.duetime
79-
next.invoke()
80+
81+
if item.duetime > self.clock:
82+
self.clock = item.duetime
83+
spinning = 0
84+
85+
if spinning > MAX_SPINNING:
86+
self.clock += 1
87+
spinning = 0
88+
89+
item.invoke()
90+
spinning += 1
8091

8192
self.is_enabled = False
8293

@@ -104,18 +115,18 @@ def advance_to(self, time):
104115
self.is_enabled = True
105116

106117
while self.is_enabled:
107-
next = self.get_next()
108-
if not next:
118+
item = self.get_next()
119+
if not item:
109120
break
110121

111-
if next.duetime > time:
112-
self.queue.enqueue(next)
122+
if item.duetime > time:
123+
self.queue.enqueue(item)
113124
break
114125

115-
if next.duetime > self.clock:
116-
self.clock = next.duetime
126+
if item.duetime > self.clock:
127+
self.clock = item.duetime
117128

118-
next.invoke()
129+
item.invoke()
119130

120131
self.is_enabled = False
121132
self.clock = time
@@ -150,10 +161,10 @@ def sleep(self, time):
150161
def get_next(self):
151162
"""Returns the next scheduled item to be executed."""
152163

153-
while len(self.queue):
154-
next = self.queue.dequeue()
155-
if not next.is_cancelled():
156-
return next
164+
while self.queue:
165+
item = self.queue.dequeue()
166+
if not item.is_cancelled():
167+
return item
157168

158169
return None
159170

rx/core/anonymousobservable.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
from typing import Callable
12
from .observable import Observable
23

34

45
class AnonymousObservable(Observable):
56
"""Class to create an Observable instance from a delegate-based
67
implementation of the Subscribe method."""
78

8-
def __init__(self, subscribe):
9+
def __init__(self, subscribe: Callable) -> None:
910
"""Creates an observable sequence object from the specified
1011
subscription function.
1112
@@ -16,5 +17,5 @@ def __init__(self, subscribe):
1617
self._subscribe = subscribe
1718
super(AnonymousObservable, self).__init__()
1819

19-
def _subscribe_core(self, observer):
20-
return self._subscribe(observer)
20+
def _subscribe_core(self, observer, scheduler=None):
21+
return self._subscribe(observer, scheduler)

rx/core/anonymousobserver.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def __init__(self, send=None, throw=None, close=None):
88

99
self._next = send or noop
1010
self._error = throw or default_error
11-
self._completed = close or noop
11+
self._close = close or noop
1212

1313
def _send_core(self, value):
1414
self._next(value)
@@ -17,4 +17,4 @@ def _throw_core(self, error):
1717
self._error(error)
1818

1919
def _close_core(self):
20-
self._completed()
20+
self._close()

rx/core/bases/observable.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33

44
class Observable(metaclass=ABCMeta):
55
@abstractmethod
6-
def subscribe(self, observer):
6+
def subscribe(self, observer=None, scheduler=None):
77
raise NotImplementedError

rx/core/blockingobservable.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ def __init__(self, observable=None):
1515
self.observable = observable
1616
super(BlockingObservable, self).__init__()
1717

18-
def _subscribe_core(self, observer):
19-
return self.observable.subscribe(observer)
18+
def _subscribe_core(self, observer, scheduler=None):
19+
return self.observable.subscribe(observer, scheduler)

rx/core/notification.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def to_observable(self, scheduler=None):
4545
notification = self
4646
scheduler = scheduler or immediate_scheduler
4747

48-
def subscribe(observer):
48+
def subscribe(observer, scheduler=None):
4949
def action(scheduler, state):
5050
notification._accept_observable(observer)
5151
if notification.kind == 'N':

0 commit comments

Comments
 (0)