Skip to content

Commit 1dd9138

Browse files
committed
Improve async() by making its promises cancelable
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call. ```php $promise = async(static function (): int { echo 'a'; await(sleep(2)); echo 'b'; return time(); })(); $promise->cancel(); await($promise); ```` This builds on top of #15, #18, #19, #26, #28, #30, and #32.
1 parent 4cadacc commit 1dd9138

File tree

5 files changed

+319
-9
lines changed

5 files changed

+319
-9
lines changed

README.md

+102
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,108 @@ $promise->then(function (int $bytes) {
204204
});
205205
```
206206

207+
Promises returned by the `async` function can be cancelled and when done they will cancel any recursive `async` call
208+
and any currently awaited promise using the `await` function. In the following example `echo 'b';` will never
209+
be reached, and the `await` function at the bottom will also throw an exception with the following message
210+
`Timer cancelled`.
211+
212+
```php
213+
$promise = async(static function (): int {
214+
echo 'a';
215+
await(sleep(2));
216+
echo 'b';
217+
218+
return time();
219+
})();
220+
221+
$promise->cancel();
222+
await($promise);
223+
````
224+
225+
If you however decide to try and catch that `await` you will reach `echo 'b';`. The exception you caught however
226+
isn't thrown by the bottom await function. Just as with synchronous code catching it lets you ignore the exception or
227+
error that is thrown.
228+
229+
```php
230+
$promise = async(static function (): int {
231+
echo 'a';
232+
try {
233+
await(sleep(2));
234+
} catch (\Throwable) {
235+
// No-Op
236+
}
237+
echo 'b';
238+
239+
return time();
240+
})();
241+
242+
$promise->cancel();
243+
await($promise);
244+
```
245+
246+
When a fiber is cancelled, all currently pending and future awaited promises will be cancelled. As such the following
247+
example will never output `c` and a timeout exception will be thrown.
248+
249+
```php
250+
$promise = async(static function (): int {
251+
echo 'a';
252+
try {
253+
await(sleep(2));
254+
} catch (\Throwable) {
255+
// No-Op
256+
}
257+
echo 'b';
258+
await(sleep(0.1));
259+
echo 'c';
260+
261+
return time();
262+
})();
263+
264+
$promise->cancel();
265+
await($promise);
266+
```
267+
268+
Any nested `async` and `await` calls are also canceled. You can nest this as deep as you want. As long as you await
269+
every promise yielding function you call. The following example will output `abc`.
270+
271+
```php
272+
$promise = async(static function (): int {
273+
echo 'a';
274+
await(async(static function(): void {
275+
echo 'b';
276+
await(async(static function(): void {
277+
echo 'c';
278+
await(sleep(2));
279+
echo 'd';
280+
})());
281+
echo 'e';
282+
})());
283+
echo 'f';
284+
285+
return time();
286+
})();
287+
288+
$promise->cancel();
289+
await($promise);
290+
```
291+
292+
Be very much aware that if you call a promise yielding function and not await it, it will not be cancelled. The
293+
following example will output `acb`.
294+
295+
```php
296+
$promise = async(static function (): int {
297+
echo 'a';
298+
sleep(0.001)->then(static function (): void {
299+
echo 'b';
300+
});
301+
echo 'c';
302+
303+
return time();
304+
})();
305+
306+
$promise->cancel();
307+
```
308+
207309
### await()
208310

209311
The `await(PromiseInterface $promise): mixed` function can be used to

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $status = [];
14+
private array $map = [];
15+
16+
public function register(Fiber $fiber): void
17+
{
18+
$this->status[spl_object_hash($fiber)] = false;
19+
$this->map[spl_object_hash($fiber)] = [];
20+
}
21+
22+
public function cancel(Fiber $fiber): void
23+
{
24+
$this->status[spl_object_hash($fiber)] = true;
25+
}
26+
27+
public function isCanceled(Fiber $fiber): bool
28+
{
29+
return $this->status[spl_object_hash($fiber)];
30+
}
31+
32+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
33+
{
34+
$this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise;
35+
}
36+
37+
public function has(Fiber $fiber): bool
38+
{
39+
return array_key_exists(spl_object_hash($fiber), $this->map);
40+
}
41+
42+
public function getPromises(Fiber $fiber): array
43+
{
44+
return $this->map[spl_object_hash($fiber)];
45+
}
46+
47+
public function unregister(Fiber $fiber): void
48+
{
49+
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
50+
}
51+
}

src/functions.php

+57-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
@@ -155,17 +156,40 @@
155156
*/
156157
function async(callable $function): callable
157158
{
158-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
159-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
160-
try {
161-
$resolve($function(...$args));
162-
} catch (\Throwable $exception) {
163-
$reject($exception);
159+
return static function (mixed ...$args) use ($function): PromiseInterface {
160+
$fiber = null;
161+
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
162+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
163+
try {
164+
$resolve($function(...$args));
165+
} catch (\Throwable $exception) {
166+
$reject($exception);
167+
} finally {
168+
fiberMap()->unregister($fiber);
169+
}
170+
});
171+
172+
fiberMap()->register($fiber);
173+
174+
$fiber->start();
175+
}, function () use (&$fiber): void {
176+
if ($fiber instanceof Fiber) {
177+
fiberMap()->cancel($fiber);
178+
foreach (fiberMap()->getPromises($fiber) as $promise) {
179+
if (method_exists($promise, 'cancel')) {
180+
$promise->cancel();
181+
}
182+
}
164183
}
165184
});
166185

167-
$fiber->start();
168-
});
186+
$lowLevelFiber = \Fiber::getCurrent();
187+
if ($lowLevelFiber !== null) {
188+
fiberMap()->attachPromise($lowLevelFiber, $promise);
189+
}
190+
191+
return $promise;
192+
};
169193
}
170194

171195

@@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed
230254
$rejected = false;
231255
$resolvedValue = null;
232256
$rejectedThrowable = null;
257+
$lowLevelFiber = \Fiber::getCurrent();
258+
259+
if ($lowLevelFiber !== null) {
260+
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
261+
$promise->cancel();
262+
}
263+
}
233264

234265
$promise->then(
235266
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
285316
throw $rejectedThrowable;
286317
}
287318

319+
if ($lowLevelFiber !== null) {
320+
fiberMap()->attachPromise($lowLevelFiber, $promise);
321+
}
322+
288323
$fiber = FiberFactory::create();
289324

290325
return $fiber->suspend();
@@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface
601636

602637
return $deferred->promise();
603638
}
639+
640+
/**
641+
* @internal
642+
*/
643+
function fiberMap(): FiberMap
644+
{
645+
static $wm = null;
646+
647+
if ($wm === null) {
648+
$wm = new FiberMap();
649+
}
650+
651+
return $wm;
652+
}

tests/AsyncTest.php

+107
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use function React\Promise\all;
1212
use function React\Promise\reject;
1313
use function React\Promise\resolve;
14+
use function React\Promise\Timer\sleep;
1415

1516
class AsyncTest extends TestCase
1617
{
@@ -185,4 +186,110 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
185186
$this->assertGreaterThan(0.1, $time);
186187
$this->assertLessThan(0.12, $time);
187188
}
189+
190+
public function testCancel()
191+
{
192+
self::expectOutputString('a');
193+
$this->expectException(\Exception::class);
194+
$this->expectExceptionMessage('Timer cancelled');
195+
196+
$promise = async(static function (): int {
197+
echo 'a';
198+
await(sleep(2));
199+
echo 'b';
200+
201+
return time();
202+
})();
203+
204+
$promise->cancel();
205+
await($promise);
206+
}
207+
208+
public function testCancelTryCatch()
209+
{
210+
self::expectOutputString('ab');
211+
// $this->expectException(\Exception::class);
212+
// $this->expectExceptionMessage('Timer cancelled');
213+
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
try {
217+
await(sleep(2));
218+
} catch (\Throwable) {
219+
// No-Op
220+
}
221+
echo 'b';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
}
229+
230+
public function testNestedCancel()
231+
{
232+
self::expectOutputString('abc');
233+
$this->expectException(\Exception::class);
234+
$this->expectExceptionMessage('Timer cancelled');
235+
236+
$promise = async(static function (): int {
237+
echo 'a';
238+
await(async(static function(): void {
239+
echo 'b';
240+
await(async(static function(): void {
241+
echo 'c';
242+
await(sleep(2));
243+
echo 'd';
244+
})());
245+
echo 'e';
246+
})());
247+
echo 'f';
248+
249+
return time();
250+
})();
251+
252+
$promise->cancel();
253+
await($promise);
254+
}
255+
256+
public function testCancelFiberThatCatchesExceptions()
257+
{
258+
self::expectOutputString('ab');
259+
$this->expectException(\Exception::class);
260+
$this->expectExceptionMessage('Timer cancelled');
261+
262+
$promise = async(static function (): int {
263+
echo 'a';
264+
try {
265+
await(sleep(2));
266+
} catch (\Throwable) {
267+
// No-Op
268+
}
269+
echo 'b';
270+
await(sleep(0.1));
271+
echo 'c';
272+
273+
return time();
274+
})();
275+
276+
$promise->cancel();
277+
await($promise);
278+
}
279+
280+
public function testNotAwaitedPromiseWillNotBeCanceled()
281+
{
282+
self::expectOutputString('acb');
283+
284+
async(static function (): int {
285+
echo 'a';
286+
sleep(0.001)->then(static function (): void {
287+
echo 'b';
288+
});
289+
echo 'c';
290+
291+
return time();
292+
})()->cancel();
293+
Loop::run();
294+
}
188295
}

0 commit comments

Comments
 (0)