Skip to content

Commit 6224eb7

Browse files
authored
Merge pull request #20 from WyriHaximus-labs/cancel-fiber
Improve `async()` by making its promises cancellable
2 parents 4cadacc + 262ef59 commit 6224eb7

File tree

6 files changed

+293
-11
lines changed

6 files changed

+293
-11
lines changed

README.md

+23
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
204204
});
205205
```
206206

207+
Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
208+
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
209+
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
210+
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
211+
of the example.
212+
213+
```php
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
await(async(static function(): void {
217+
echo 'b';
218+
await(sleep(2));
219+
echo 'c';
220+
})());
221+
echo 'd';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
```
229+
207230
### await()
208231

209232
The `await(PromiseInterface $promise): mixed` function can be used to

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use React\Promise\PromiseInterface;
6+
7+
/**
8+
* @internal
9+
*/
10+
final class FiberMap
11+
{
12+
private static array $status = [];
13+
private static array $map = [];
14+
15+
public static function register(\Fiber $fiber): void
16+
{
17+
self::$status[\spl_object_id($fiber)] = false;
18+
self::$map[\spl_object_id($fiber)] = [];
19+
}
20+
21+
public static function cancel(\Fiber $fiber): void
22+
{
23+
self::$status[\spl_object_id($fiber)] = true;
24+
}
25+
26+
public static function isCancelled(\Fiber $fiber): bool
27+
{
28+
return self::$status[\spl_object_id($fiber)];
29+
}
30+
31+
public static function setPromise(\Fiber $fiber, PromiseInterface $promise): void
32+
{
33+
self::$map[\spl_object_id($fiber)] = $promise;
34+
}
35+
36+
public static function unsetPromise(\Fiber $fiber, PromiseInterface $promise): void
37+
{
38+
unset(self::$map[\spl_object_id($fiber)]);
39+
}
40+
41+
public static function has(\Fiber $fiber): bool
42+
{
43+
return array_key_exists(\spl_object_id($fiber), self::$map);
44+
}
45+
46+
public static function getPromise(\Fiber $fiber): ?PromiseInterface
47+
{
48+
return self::$map[\spl_object_id($fiber)] ?? null;
49+
}
50+
51+
public static function unregister(\Fiber $fiber): void
52+
{
53+
unset(self::$status[\spl_object_id($fiber)], self::$map[\spl_object_id($fiber)]);
54+
}
55+
}

src/functions.php

+72-10
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,69 @@
148148
* });
149149
* ```
150150
*
151+
* Promises returned by `async()` can be cancelled, and when done any currently
152+
* and future awaited promise inside that and any nested fibers with their
153+
* awaited promises will also be cancelled. As such the following example will
154+
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
155+
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
156+
* through the fibers ultimately to the end user through the [`await()`](#await)
157+
* on the last line of the example.
158+
*
159+
* ```php
160+
* $promise = async(static function (): int {
161+
* echo 'a';
162+
* await(async(static function(): void {
163+
* echo 'b';
164+
* await(sleep(2));
165+
* echo 'c';
166+
* })());
167+
* echo 'd';
168+
*
169+
* return time();
170+
* })();
171+
*
172+
* $promise->cancel();
173+
* await($promise);
174+
* ```
175+
*
151176
* @param callable(mixed ...$args):mixed $function
152177
* @return callable(): PromiseInterface<mixed>
153178
* @since 4.0.0
154179
* @see coroutine()
155180
*/
156181
function async(callable $function): callable
157182
{
158-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
159-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
160-
try {
161-
$resolve($function(...$args));
162-
} catch (\Throwable $exception) {
163-
$reject($exception);
183+
return static function (mixed ...$args) use ($function): PromiseInterface {
184+
$fiber = null;
185+
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
186+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
187+
try {
188+
$resolve($function(...$args));
189+
} catch (\Throwable $exception) {
190+
$reject($exception);
191+
} finally {
192+
FiberMap::unregister($fiber);
193+
}
194+
});
195+
196+
FiberMap::register($fiber);
197+
198+
$fiber->start();
199+
}, function () use (&$fiber): void {
200+
FiberMap::cancel($fiber);
201+
$promise = FiberMap::getPromise($fiber);
202+
if ($promise instanceof CancellablePromiseInterface) {
203+
$promise->cancel();
164204
}
165205
});
166206

167-
$fiber->start();
168-
});
207+
$lowLevelFiber = \Fiber::getCurrent();
208+
if ($lowLevelFiber !== null) {
209+
FiberMap::setPromise($lowLevelFiber, $promise);
210+
}
211+
212+
return $promise;
213+
};
169214
}
170215

171216

@@ -230,9 +275,18 @@ function await(PromiseInterface $promise): mixed
230275
$rejected = false;
231276
$resolvedValue = null;
232277
$rejectedThrowable = null;
278+
$lowLevelFiber = \Fiber::getCurrent();
279+
280+
if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
281+
$promise->cancel();
282+
}
233283

234284
$promise->then(
235-
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
285+
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void {
286+
if ($lowLevelFiber !== null) {
287+
FiberMap::unsetPromise($lowLevelFiber, $promise);
288+
}
289+
236290
if ($fiber === null) {
237291
$resolved = true;
238292
$resolvedValue = $value;
@@ -241,7 +295,11 @@ function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
241295

242296
$fiber->resume($value);
243297
},
244-
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void {
298+
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber, $lowLevelFiber, $promise): void {
299+
if ($lowLevelFiber !== null) {
300+
FiberMap::unsetPromise($lowLevelFiber, $promise);
301+
}
302+
245303
if (!$throwable instanceof \Throwable) {
246304
$throwable = new \UnexpectedValueException(
247305
'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable))
@@ -285,6 +343,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
285343
throw $rejectedThrowable;
286344
}
287345

346+
if ($lowLevelFiber !== null) {
347+
FiberMap::setPromise($lowLevelFiber, $promise);
348+
}
349+
288350
$fiber = FiberFactory::create();
289351

290352
return $fiber->suspend();

tests/AsyncTest.php

+105
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use function React\Promise\all;
1212
use function React\Promise\reject;
1313
use function React\Promise\resolve;
14+
use function React\Promise\Timer\sleep;
1415

1516
class AsyncTest extends TestCase
1617
{
@@ -185,4 +186,108 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
185186
$this->assertGreaterThan(0.1, $time);
186187
$this->assertLessThan(0.12, $time);
187188
}
189+
190+
public function testCancel()
191+
{
192+
self::expectOutputString('a');
193+
$this->expectException(\Exception::class);
194+
$this->expectExceptionMessage('Timer cancelled');
195+
196+
$promise = async(static function (): int {
197+
echo 'a';
198+
await(sleep(2));
199+
echo 'b';
200+
201+
return time();
202+
})();
203+
204+
$promise->cancel();
205+
await($promise);
206+
}
207+
208+
public function testCancelTryCatch()
209+
{
210+
self::expectOutputString('ab');
211+
212+
$promise = async(static function (): int {
213+
echo 'a';
214+
try {
215+
await(sleep(2));
216+
} catch (\Throwable) {
217+
// No-Op
218+
}
219+
echo 'b';
220+
221+
return time();
222+
})();
223+
224+
$promise->cancel();
225+
await($promise);
226+
}
227+
228+
public function testNestedCancel()
229+
{
230+
self::expectOutputString('abc');
231+
$this->expectException(\Exception::class);
232+
$this->expectExceptionMessage('Timer cancelled');
233+
234+
$promise = async(static function (): int {
235+
echo 'a';
236+
await(async(static function(): void {
237+
echo 'b';
238+
await(async(static function(): void {
239+
echo 'c';
240+
await(sleep(2));
241+
echo 'd';
242+
})());
243+
echo 'e';
244+
})());
245+
echo 'f';
246+
247+
return time();
248+
})();
249+
250+
$promise->cancel();
251+
await($promise);
252+
}
253+
254+
public function testCancelFiberThatCatchesExceptions()
255+
{
256+
self::expectOutputString('ab');
257+
$this->expectException(\Exception::class);
258+
$this->expectExceptionMessage('Timer cancelled');
259+
260+
$promise = async(static function (): int {
261+
echo 'a';
262+
try {
263+
await(sleep(2));
264+
} catch (\Throwable) {
265+
// No-Op
266+
}
267+
echo 'b';
268+
await(sleep(0.1));
269+
echo 'c';
270+
271+
return time();
272+
})();
273+
274+
$promise->cancel();
275+
await($promise);
276+
}
277+
278+
public function testNotAwaitedPromiseWillNotBeCanceled()
279+
{
280+
self::expectOutputString('acb');
281+
282+
async(static function (): int {
283+
echo 'a';
284+
sleep(0.001)->then(static function (): void {
285+
echo 'b';
286+
});
287+
echo 'c';
288+
289+
return time();
290+
})()->cancel();
291+
Loop::run();
292+
}
188293
}

tests/AwaitTest.php

+36
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,42 @@ public function testNestedAwaits(callable $await)
332332
})));
333333
}
334334

335+
/**
336+
* @dataProvider provideAwaiters
337+
*/
338+
public function testResolvedPromisesShouldBeDetached(callable $await)
339+
{
340+
$await(async(function () use ($await): int {
341+
$fiber = \Fiber::getCurrent();
342+
$await(React\Promise\Timer\sleep(0.01));
343+
$this->assertNull(React\Async\FiberMap::getPromise($fiber));
344+
345+
return time();
346+
})());
347+
}
348+
349+
/**
350+
* @dataProvider provideAwaiters
351+
*/
352+
public function testRejectedPromisesShouldBeDetached(callable $await)
353+
{
354+
$this->expectException(\Exception::class);
355+
$this->expectExceptionMessage('Boom!');
356+
357+
$await(async(function () use ($await): int {
358+
$fiber = \Fiber::getCurrent();
359+
try {
360+
$await(React\Promise\reject(new \Exception('Boom!')));
361+
} catch (\Throwable $throwable) {
362+
throw $throwable;
363+
} finally {
364+
$this->assertNull(React\Async\FiberMap::getPromise($fiber));
365+
}
366+
367+
return time();
368+
})());
369+
}
370+
335371
public function provideAwaiters(): iterable
336372
{
337373
yield 'await' => [static fn (React\Promise\PromiseInterface $promise): mixed => React\Async\await($promise)];

0 commit comments

Comments
 (0)