Skip to content

Commit 794c7d8

Browse files
committed
Cancel fiber
1 parent 4cadacc commit 794c7d8

File tree

5 files changed

+283
-9
lines changed

5 files changed

+283
-9
lines changed

README.md

+82
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,88 @@ $promise->then(function (int $bytes) {
204204
});
205205
```
206206

207+
Promises returned by the `async` function can be cancelled and when done they will cancel any recursive `async` call
208+
and any currently awaited promise using the `await` function. In the following example `echo 'b';` will never
209+
be reached, and the `await` function at the bottom will also throw an exception with the following message
210+
`Timer cancelled`.
211+
212+
```php
213+
$promise = async(static function (): int {
214+
echo 'a';
215+
await(sleep(2));
216+
echo 'b';
217+
218+
return time();
219+
})();
220+
221+
$promise->cancel();
222+
await($promise);
223+
````
224+
225+
If you however decide to try and catch that `await` you will reach `echo 'b';`. The exception you caught however
226+
isn't thrown by the bottom await function:
227+
228+
```php
229+
$promise = async(static function (): int {
230+
echo 'a';
231+
try {
232+
await(sleep(2));
233+
} catch (\Throwable) {
234+
// No-Op
235+
}
236+
echo 'b';
237+
238+
return time();
239+
})();
240+
241+
$promise->cancel();
242+
await($promise);
243+
```
244+
245+
The second promise coming from the `sleep` function will be canceled the moment it's passed into the `await` call.
246+
247+
```php
248+
$promise = async(static function (): int {
249+
echo 'a';
250+
try {
251+
await(sleep(2));
252+
} catch (\Throwable) {
253+
// No-Op
254+
}
255+
echo 'b';
256+
await(sleep(0.1));
257+
echo 'c';
258+
259+
return time();
260+
})();
261+
262+
$promise->cancel();
263+
await($promise);
264+
```
265+
266+
Any nested `async` and `await` calls are also canceled. The following example will output `abc`:
267+
268+
```php
269+
$promise = async(static function (): int {
270+
echo 'a';
271+
await(async(static function(): void {
272+
echo 'b';
273+
await(async(static function(): void {
274+
echo 'c';
275+
await(sleep(2));
276+
echo 'd';
277+
})());
278+
echo 'e';
279+
})());
280+
echo 'f';
281+
282+
return time();
283+
})();
284+
285+
$promise->cancel();
286+
await($promise);
287+
```
288+
207289
### await()
208290

209291
The `await(PromiseInterface $promise): mixed` function can be used to

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $status = [];
14+
private array $map = [];
15+
16+
public function register(Fiber $fiber): void
17+
{
18+
$this->status[spl_object_hash($fiber)] = false;
19+
$this->map[spl_object_hash($fiber)] = [];
20+
}
21+
22+
public function cancel(Fiber $fiber): void
23+
{
24+
$this->status[spl_object_hash($fiber)] = true;
25+
}
26+
27+
public function isCanceled(Fiber $fiber): bool
28+
{
29+
return $this->status[spl_object_hash($fiber)];
30+
}
31+
32+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
33+
{
34+
$this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise;
35+
}
36+
37+
public function has(Fiber $fiber): bool
38+
{
39+
return array_key_exists(spl_object_hash($fiber), $this->map);
40+
}
41+
42+
public function getPromises(Fiber $fiber): array
43+
{
44+
return $this->map[spl_object_hash($fiber)];
45+
}
46+
47+
public function unregister(Fiber $fiber): void
48+
{
49+
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
50+
}
51+
}

src/functions.php

+57-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
@@ -155,17 +156,40 @@
155156
*/
156157
function async(callable $function): callable
157158
{
158-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
159-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
160-
try {
161-
$resolve($function(...$args));
162-
} catch (\Throwable $exception) {
163-
$reject($exception);
159+
return static function (mixed ...$args) use ($function): PromiseInterface {
160+
$fiber = null;
161+
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
162+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
163+
try {
164+
$resolve($function(...$args));
165+
} catch (\Throwable $exception) {
166+
$reject($exception);
167+
} finally {
168+
fiberMap()->unregister($fiber);
169+
}
170+
});
171+
172+
fiberMap()->register($fiber);
173+
174+
$fiber->start();
175+
}, function () use (&$fiber): void {
176+
if ($fiber instanceof Fiber) {
177+
fiberMap()->cancel($fiber);
178+
foreach (fiberMap()->getPromises($fiber) as $promise) {
179+
if (method_exists($promise, 'cancel')) {
180+
$promise->cancel();
181+
}
182+
}
164183
}
165184
});
166185

167-
$fiber->start();
168-
});
186+
$lowLevelFiber = \Fiber::getCurrent();
187+
if ($lowLevelFiber !== null) {
188+
fiberMap()->attachPromise($lowLevelFiber, $promise);
189+
}
190+
191+
return $promise;
192+
};
169193
}
170194

171195

@@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed
230254
$rejected = false;
231255
$resolvedValue = null;
232256
$rejectedThrowable = null;
257+
$lowLevelFiber = \Fiber::getCurrent();
258+
259+
if ($lowLevelFiber !== null) {
260+
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
261+
$promise->cancel();
262+
}
263+
}
233264

234265
$promise->then(
235266
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
285316
throw $rejectedThrowable;
286317
}
287318

319+
if ($lowLevelFiber !== null) {
320+
fiberMap()->attachPromise($lowLevelFiber, $promise);
321+
}
322+
288323
$fiber = FiberFactory::create();
289324

290325
return $fiber->suspend();
@@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface
601636

602637
return $deferred->promise();
603638
}
639+
640+
/**
641+
* @internal
642+
*/
643+
function fiberMap(): FiberMap
644+
{
645+
static $wm = null;
646+
647+
if ($wm === null) {
648+
$wm = new FiberMap();
649+
}
650+
651+
return $wm;
652+
}

tests/AsyncTest.php

+91
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use function React\Promise\all;
1212
use function React\Promise\reject;
1313
use function React\Promise\resolve;
14+
use function React\Promise\Timer\sleep;
1415

1516
class AsyncTest extends TestCase
1617
{
@@ -185,4 +186,94 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
185186
$this->assertGreaterThan(0.1, $time);
186187
$this->assertLessThan(0.12, $time);
187188
}
189+
190+
public function testCancel()
191+
{
192+
self::expectOutputString('a');
193+
$this->expectException(\Exception::class);
194+
$this->expectExceptionMessage('Timer cancelled');
195+
196+
$promise = async(static function (): int {
197+
echo 'a';
198+
await(sleep(2));
199+
echo 'b';
200+
201+
return time();
202+
})();
203+
204+
$promise->cancel();
205+
await($promise);
206+
}
207+
208+
public function testCancelTryCatch()
209+
{
210+
self::expectOutputString('ab');
211+
// $this->expectException(\Exception::class);
212+
// $this->expectExceptionMessage('Timer cancelled');
213+
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
try {
217+
await(sleep(2));
218+
} catch (\Throwable) {
219+
// No-Op
220+
}
221+
echo 'b';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
}
229+
230+
public function testNestedCancel()
231+
{
232+
self::expectOutputString('abc');
233+
$this->expectException(\Exception::class);
234+
$this->expectExceptionMessage('Timer cancelled');
235+
236+
$promise = async(static function (): int {
237+
echo 'a';
238+
await(async(static function(): void {
239+
echo 'b';
240+
await(async(static function(): void {
241+
echo 'c';
242+
await(sleep(2));
243+
echo 'd';
244+
})());
245+
echo 'e';
246+
})());
247+
echo 'f';
248+
249+
return time();
250+
})();
251+
252+
$promise->cancel();
253+
await($promise);
254+
}
255+
256+
public function testCancelFiberThatCatchesExceptions()
257+
{
258+
self::expectOutputString('ab');
259+
$this->expectException(\Exception::class);
260+
$this->expectExceptionMessage('Timer cancelled');
261+
262+
$promise = async(static function (): int {
263+
echo 'a';
264+
try {
265+
await(sleep(2));
266+
} catch (\Throwable) {
267+
// No-Op
268+
}
269+
echo 'b';
270+
await(sleep(0.1));
271+
echo 'c';
272+
273+
return time();
274+
})();
275+
276+
$promise->cancel();
277+
await($promise);
278+
}
188279
}

0 commit comments

Comments
 (0)