Skip to content

Commit 24e5b93

Browse files
committed
Improve async() by making its promises cancelable
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call. ```php $promise = async(static function (): int { echo 'a'; await(sleep(2)); echo 'b'; return time(); })(); $promise->cancel(); await($promise); ```` This builds on top of #15, #18, #19, #26, #28, #30, and #32.
1 parent 4cadacc commit 24e5b93

File tree

5 files changed

+253
-10
lines changed

5 files changed

+253
-10
lines changed

README.md

+23
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
204204
});
205205
```
206206

207+
Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
208+
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
209+
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
210+
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
211+
of the example.
212+
213+
```php
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
await(async(static function(): void {
217+
echo 'b';
218+
await(sleep(2));
219+
echo 'c';
220+
})());
221+
echo 'd';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
```
229+
207230
### await()
208231

209232
The `await(PromiseInterface $promise): mixed` function can be used to

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use React\Promise\PromiseInterface;
6+
7+
/**
8+
* @internal
9+
*/
10+
final class FiberMap
11+
{
12+
private static array $status = [];
13+
private static array $map = [];
14+
15+
public static function register(\Fiber $fiber): void
16+
{
17+
self::$status[\spl_object_hash($fiber)] = false;
18+
self::$map[\spl_object_hash($fiber)] = [];
19+
}
20+
21+
public static function cancel(\Fiber $fiber): void
22+
{
23+
self::$status[\spl_object_hash($fiber)] = true;
24+
}
25+
26+
public static function isCancelled(\Fiber $fiber): bool
27+
{
28+
return self::$status[\spl_object_hash($fiber)];
29+
}
30+
31+
public static function attachPromise(\Fiber $fiber, PromiseInterface $promise): void
32+
{
33+
self::$map[\spl_object_hash($fiber)][\spl_object_hash($promise)] = $promise;
34+
}
35+
36+
public static function detachPromise(\Fiber $fiber, PromiseInterface $promise): void
37+
{
38+
unset(self::$map[\spl_object_hash($fiber)][\spl_object_hash($promise)]);
39+
}
40+
41+
public static function has(\Fiber $fiber): bool
42+
{
43+
return array_key_exists(\spl_object_hash($fiber), self::$map);
44+
}
45+
46+
public static function getPromises(\Fiber $fiber): array
47+
{
48+
return self::$map[\spl_object_hash($fiber)] ?? [];
49+
}
50+
51+
public static function unregister(\Fiber $fiber): void
52+
{
53+
unset(self::$status[\spl_object_hash($fiber)], self::$map[\spl_object_hash($fiber)]);
54+
}
55+
}

src/functions.php

+68-9
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,70 @@
148148
* });
149149
* ```
150150
*
151+
* Promises returned by `async()` can be cancelled, and when done any currently
152+
* and future awaited promise inside that and any nested fibers with their
153+
* awaited promises will also be cancelled. As such the following example will
154+
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
155+
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
156+
* through the fibers ultimately to the end user through the [`await()`](#await)
157+
* on the last line of the example.
158+
*
159+
* ```php
160+
* $promise = async(static function (): int {
161+
* echo 'a';
162+
* await(async(static function(): void {
163+
* echo 'b';
164+
* await(sleep(2));
165+
* echo 'c';
166+
* })());
167+
* echo 'd';
168+
*
169+
* return time();
170+
* })();
171+
*
172+
* $promise->cancel();
173+
* await($promise);
174+
* ```
175+
*
151176
* @param callable(mixed ...$args):mixed $function
152177
* @return callable(): PromiseInterface<mixed>
153178
* @since 4.0.0
154179
* @see coroutine()
155180
*/
156181
function async(callable $function): callable
157182
{
158-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
159-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
160-
try {
161-
$resolve($function(...$args));
162-
} catch (\Throwable $exception) {
163-
$reject($exception);
183+
return static function (mixed ...$args) use ($function): PromiseInterface {
184+
$fiber = null;
185+
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
186+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
187+
try {
188+
$resolve($function(...$args));
189+
} catch (\Throwable $exception) {
190+
$reject($exception);
191+
} finally {
192+
FiberMap::unregister($fiber);
193+
}
194+
});
195+
196+
FiberMap::register($fiber);
197+
198+
$fiber->start();
199+
}, function () use (&$fiber): void {
200+
FiberMap::cancel($fiber);
201+
foreach (FiberMap::getPromises($fiber) as $promise) {
202+
if ($promise instanceof CancellablePromiseInterface) {
203+
$promise->cancel();
204+
}
164205
}
165206
});
166207

167-
$fiber->start();
168-
});
208+
$lowLevelFiber = \Fiber::getCurrent();
209+
if ($lowLevelFiber !== null) {
210+
FiberMap::attachPromise($lowLevelFiber, $promise);
211+
}
212+
213+
return $promise;
214+
};
169215
}
170216

171217

@@ -230,9 +276,18 @@ function await(PromiseInterface $promise): mixed
230276
$rejected = false;
231277
$resolvedValue = null;
232278
$rejectedThrowable = null;
279+
$lowLevelFiber = \Fiber::getCurrent();
280+
281+
if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
282+
$promise->cancel();
283+
}
233284

234285
$promise->then(
235-
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
286+
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void {
287+
if ($lowLevelFiber !== null) {
288+
FiberMap::detachPromise($lowLevelFiber, $promise);
289+
}
290+
236291
if ($fiber === null) {
237292
$resolved = true;
238293
$resolvedValue = $value;
@@ -285,6 +340,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
285340
throw $rejectedThrowable;
286341
}
287342

343+
if ($lowLevelFiber !== null) {
344+
FiberMap::attachPromise($lowLevelFiber, $promise);
345+
}
346+
288347
$fiber = FiberFactory::create();
289348

290349
return $fiber->suspend();

tests/AsyncTest.php

+105
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use function React\Promise\all;
1212
use function React\Promise\reject;
1313
use function React\Promise\resolve;
14+
use function React\Promise\Timer\sleep;
1415

1516
class AsyncTest extends TestCase
1617
{
@@ -185,4 +186,108 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
185186
$this->assertGreaterThan(0.1, $time);
186187
$this->assertLessThan(0.12, $time);
187188
}
189+
190+
public function testCancel()
191+
{
192+
self::expectOutputString('a');
193+
$this->expectException(\Exception::class);
194+
$this->expectExceptionMessage('Timer cancelled');
195+
196+
$promise = async(static function (): int {
197+
echo 'a';
198+
await(sleep(2));
199+
echo 'b';
200+
201+
return time();
202+
})();
203+
204+
$promise->cancel();
205+
await($promise);
206+
}
207+
208+
public function testCancelTryCatch()
209+
{
210+
self::expectOutputString('ab');
211+
212+
$promise = async(static function (): int {
213+
echo 'a';
214+
try {
215+
await(sleep(2));
216+
} catch (\Throwable) {
217+
// No-Op
218+
}
219+
echo 'b';
220+
221+
return time();
222+
})();
223+
224+
$promise->cancel();
225+
await($promise);
226+
}
227+
228+
public function testNestedCancel()
229+
{
230+
self::expectOutputString('abc');
231+
$this->expectException(\Exception::class);
232+
$this->expectExceptionMessage('Timer cancelled');
233+
234+
$promise = async(static function (): int {
235+
echo 'a';
236+
await(async(static function(): void {
237+
echo 'b';
238+
await(async(static function(): void {
239+
echo 'c';
240+
await(sleep(2));
241+
echo 'd';
242+
})());
243+
echo 'e';
244+
})());
245+
echo 'f';
246+
247+
return time();
248+
})();
249+
250+
$promise->cancel();
251+
await($promise);
252+
}
253+
254+
public function testCancelFiberThatCatchesExceptions()
255+
{
256+
self::expectOutputString('ab');
257+
$this->expectException(\Exception::class);
258+
$this->expectExceptionMessage('Timer cancelled');
259+
260+
$promise = async(static function (): int {
261+
echo 'a';
262+
try {
263+
await(sleep(2));
264+
} catch (\Throwable) {
265+
// No-Op
266+
}
267+
echo 'b';
268+
await(sleep(0.1));
269+
echo 'c';
270+
271+
return time();
272+
})();
273+
274+
$promise->cancel();
275+
await($promise);
276+
}
277+
278+
public function testNotAwaitedPromiseWillNotBeCanceled()
279+
{
280+
self::expectOutputString('acb');
281+
282+
async(static function (): int {
283+
echo 'a';
284+
sleep(0.001)->then(static function (): void {
285+
echo 'b';
286+
});
287+
echo 'c';
288+
289+
return time();
290+
})()->cancel();
291+
Loop::run();
292+
}
188293
}

0 commit comments

Comments
 (0)