Skip to content

Commit 9e5259c

Browse files
committed
Cancel fiber
1 parent ff11a7a commit 9e5259c

File tree

4 files changed

+118
-3
lines changed

4 files changed

+118
-3
lines changed

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $map = [];
14+
15+
public function register(Fiber $fiber): void
16+
{
17+
$this->map[spl_object_hash($fiber)] = [];
18+
}
19+
20+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
21+
{
22+
$this->map[spl_object_hash($fiber)][] = $promise;
23+
}
24+
25+
public function has(Fiber $fiber): bool
26+
{
27+
return array_key_exists(spl_object_hash($fiber), $this->map);
28+
}
29+
30+
public function getPromises(Fiber $fiber): array
31+
{
32+
return $this->map[spl_object_hash($fiber)];
33+
}
34+
35+
public function unregister(Fiber $fiber): void
36+
{
37+
unset($this->map[spl_object_hash($fiber)]);
38+
}
39+
}

src/functions.php

+41-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
@@ -20,16 +21,36 @@
2021
*/
2122
function async(callable $function): callable
2223
{
23-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
24-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
24+
$fiber = null;
25+
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
26+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
27+
if (!fiberMap()->has($fiber)) {
28+
$reject(new \Exception('Fiber has been cancelled before execution started'));
29+
return;
30+
}
31+
2532
try {
2633
$resolve($function(...$args));
2734
} catch (\Throwable $exception) {
2835
$reject($exception);
36+
} finally {
37+
fiberMap()->unregister($fiber);
2938
}
3039
});
3140

41+
fiberMap()->register($fiber);
42+
3243
Loop::futureTick(static fn() => $fiber->start());
44+
}, function () use (&$fiber): void {
45+
if ($fiber instanceof Fiber) {
46+
$promises = fiberMap()->getPromises($fiber);
47+
fiberMap()->unregister($fiber);
48+
foreach ($promises as $promise) {
49+
if (method_exists($promise, 'cancel')) {
50+
$promise->cancel();
51+
}
52+
}
53+
}
3354
});
3455
}
3556

@@ -82,6 +103,10 @@ function await(PromiseInterface $promise): mixed
82103
$rejected = false;
83104
$resolvedValue = null;
84105
$rejectedThrowable = null;
106+
$lowLevelFiber = \Fiber::getCurrent();
107+
if ($lowLevelFiber !== null) {
108+
fiberMap()->attachPromise($lowLevelFiber, $promise);
109+
}
85110

86111
$promise->then(
87112
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -433,3 +458,17 @@ function waterfall(array $tasks): PromiseInterface
433458

434459
return $deferred->promise();
435460
}
461+
462+
/**
463+
* @internal
464+
*/
465+
function fiberMap(): FiberMap
466+
{
467+
static $wm = null;
468+
469+
if ($wm === null) {
470+
$wm = new FiberMap();
471+
}
472+
473+
return $wm;
474+
}

tests/AsyncTest.php

+36
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use function React\Async\async;
99
use function React\Async\await;
1010
use function React\Promise\all;
11+
use function React\Promise\Timer\sleep;
1112

1213
class AsyncTest extends TestCase
1314
{
@@ -84,4 +85,39 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
8485
$this->assertGreaterThan(0.1, $time);
8586
$this->assertLessThan(0.12, $time);
8687
}
88+
89+
public function testCancel()
90+
{
91+
self::expectOutputString('a');
92+
93+
$promise = async(function (): int {
94+
echo 'a';
95+
await(sleep(2));
96+
echo 'b';
97+
98+
return time();
99+
})();
100+
Loop::addTimer(0.5, function () use ($promise) {
101+
$promise->cancel();
102+
});
103+
104+
Loop::run();
105+
}
106+
107+
public function testCancelNeverStartedFiber()
108+
{
109+
self::expectOutputString('');
110+
$this->expectException(\Exception::class);
111+
$this->expectExceptionMessage('Fiber has been cancelled before execution started');
112+
113+
$promise = async(function (): int {
114+
echo 'a';
115+
await(sleep(2));
116+
echo 'b';
117+
118+
return time();
119+
})();
120+
$promise->cancel();
121+
await($promise);
122+
}
87123
}

0 commit comments

Comments
 (0)