Skip to content

Commit ce12560

Browse files
authored
Fixed flat_map with mapper returning an asyncio Future (ReactiveX#460)
As of python 3.7 Asyncio Futures implement an __iter__ method, probably for backward compatibility. As a consequence, asyncio futures are also iterable. So we must check if the mapper result is a Future before checking if it is Iterable. Fixes ReactiveX#457
1 parent 5732604 commit ce12560

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

rx/core/operators/flatmap.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
def _flat_map_internal(source, mapper=None, mapper_indexed=None):
1111
def projection(x, i):
1212
mapper_result = mapper(x) if mapper else mapper_indexed(x, i)
13-
if isinstance(mapper_result, collections.abc.Iterable):
13+
if is_future(mapper_result):
14+
result = from_future(mapper_result)
15+
elif isinstance(mapper_result, collections.abc.Iterable):
1416
result = from_(mapper_result)
1517
else:
16-
result = from_future(mapper_result) if is_future(
17-
mapper_result) else mapper_result
18+
result = mapper_result
1819
return result
1920

2021
return source.pipe(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import unittest
2+
import asyncio
3+
from rx import operators as ops
4+
from rx.subject import Subject
5+
6+
from rx.scheduler.eventloop import AsyncIOScheduler
7+
8+
9+
class TestFlatMapAsync(unittest.TestCase):
10+
11+
def test_flat_map_async(self):
12+
actual_next = None
13+
loop = asyncio.get_event_loop()
14+
scheduler = AsyncIOScheduler(loop=loop)
15+
16+
def mapper(i):
17+
async def _mapper(i):
18+
return i + 1
19+
20+
return asyncio.ensure_future(_mapper(i))
21+
22+
def on_next(i):
23+
nonlocal actual_next
24+
actual_next = i
25+
26+
async def test_flat_map():
27+
x = Subject()
28+
x.pipe(ops.flat_map(mapper)).subscribe(on_next, scheduler=scheduler)
29+
x.on_next(10)
30+
await asyncio.sleep(0.1)
31+
32+
loop.run_until_complete(test_flat_map())
33+
assert actual_next == 11

0 commit comments

Comments
 (0)