Skip to content

Commit 208a782

Browse files
committed
Cancel fiber
1 parent ff11a7a commit 208a782

File tree

4 files changed

+195
-3
lines changed

4 files changed

+195
-3
lines changed

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $status = [];
14+
private array $map = [];
15+
16+
public function register(Fiber $fiber): void
17+
{
18+
$this->status[spl_object_hash($fiber)] = false;
19+
$this->map[spl_object_hash($fiber)] = [];
20+
}
21+
22+
public function cancel(Fiber $fiber): void
23+
{
24+
$this->status[spl_object_hash($fiber)] = true;
25+
}
26+
27+
public function isCanceled(Fiber $fiber): bool
28+
{
29+
return $this->status[spl_object_hash($fiber)];
30+
}
31+
32+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
33+
{
34+
$this->map[spl_object_hash($fiber)][] = $promise;
35+
}
36+
37+
public function has(Fiber $fiber): bool
38+
{
39+
return array_key_exists(spl_object_hash($fiber), $this->map);
40+
}
41+
42+
public function getPromises(Fiber $fiber): array
43+
{
44+
return $this->map[spl_object_hash($fiber)];
45+
}
46+
47+
public function unregister(Fiber $fiber): void
48+
{
49+
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
50+
}
51+
}

src/functions.php

+49-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
9+
use React\Promise\ExtendedPromiseInterface;
810
use React\Promise\Promise;
911
use React\Promise\PromiseInterface;
1012
use function React\Promise\reject;
@@ -20,16 +22,36 @@
2022
*/
2123
function async(callable $function): callable
2224
{
23-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
24-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
25+
$fiber = null;
26+
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
27+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
28+
if (fiberMap()->isCanceled($fiber)) {
29+
fiberMap()->unregister($fiber);
30+
$reject(new \Exception('Fiber has been cancelled before execution started'));
31+
return;
32+
}
33+
2534
try {
2635
$resolve($function(...$args));
2736
} catch (\Throwable $exception) {
2837
$reject($exception);
38+
} finally {
39+
fiberMap()->unregister($fiber);
2940
}
3041
});
3142

43+
fiberMap()->register($fiber);
44+
3245
Loop::futureTick(static fn() => $fiber->start());
46+
}, function () use (&$fiber): void {
47+
if ($fiber instanceof Fiber) {
48+
fiberMap()->cancel($fiber);
49+
foreach (fiberMap()->getPromises($fiber) as $promise) {
50+
if (method_exists($promise, 'cancel')) {
51+
$promise->cancel();
52+
}
53+
}
54+
}
3355
});
3456
}
3557

@@ -82,6 +104,13 @@ function await(PromiseInterface $promise): mixed
82104
$rejected = false;
83105
$resolvedValue = null;
84106
$rejectedThrowable = null;
107+
$lowLevelFiber = \Fiber::getCurrent();
108+
109+
if ($lowLevelFiber !== null) {
110+
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
111+
$promise->cancel();
112+
}
113+
}
85114

86115
$promise->then(
87116
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -118,6 +147,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
118147
throw $rejectedThrowable;
119148
}
120149

150+
if ($lowLevelFiber !== null) {
151+
fiberMap()->attachPromise($lowLevelFiber, $promise);
152+
}
153+
121154
$fiber = FiberFactory::create();
122155

123156
return $fiber->suspend();
@@ -433,3 +466,17 @@ function waterfall(array $tasks): PromiseInterface
433466

434467
return $deferred->promise();
435468
}
469+
470+
/**
471+
* @internal
472+
*/
473+
function fiberMap(): FiberMap
474+
{
475+
static $wm = null;
476+
477+
if ($wm === null) {
478+
$wm = new FiberMap();
479+
}
480+
481+
return $wm;
482+
}

tests/AsyncTest.php

+93
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use function React\Async\async;
99
use function React\Async\await;
1010
use function React\Promise\all;
11+
use function React\Promise\Timer\sleep;
1112

1213
class AsyncTest extends TestCase
1314
{
@@ -84,4 +85,96 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
8485
$this->assertGreaterThan(0.1, $time);
8586
$this->assertLessThan(0.12, $time);
8687
}
88+
89+
public function testCancel()
90+
{
91+
self::expectOutputString('a');
92+
$this->expectException(\Exception::class);
93+
$this->expectExceptionMessage('Timer cancelled');
94+
95+
$promise = async(function (): int {
96+
echo 'a';
97+
await(sleep(2));
98+
echo 'b';
99+
100+
return time();
101+
})();
102+
103+
Loop::addTimer(0.001, function () use ($promise) {
104+
$promise->cancel();
105+
});
106+
await($promise);
107+
}
108+
109+
public function testNestedCancel()
110+
{
111+
self::expectOutputString('abc');
112+
$this->expectException(\Exception::class);
113+
$this->expectExceptionMessage('Timer cancelled');
114+
115+
$promise = async(function (): int {
116+
echo 'a';
117+
await(async(function(): void {
118+
echo 'b';
119+
await(async(function(): void {
120+
echo 'c';
121+
await(sleep(2));
122+
echo 'd';
123+
})());
124+
echo 'e';
125+
})());
126+
echo 'f';
127+
128+
return time();
129+
})();
130+
131+
Loop::addTimer(0.001, function () use ($promise) {
132+
$promise->cancel();
133+
});
134+
await($promise);
135+
}
136+
137+
public function testCancelFiberThatCatchesExceptions()
138+
{
139+
self::expectOutputString('ab');
140+
$this->expectException(\Exception::class);
141+
$this->expectExceptionMessage('Timer cancelled');
142+
143+
$promise = async(function (): int {
144+
echo 'a';
145+
try {
146+
await(sleep(2));
147+
} catch (\Throwable) {
148+
// No-Op
149+
}
150+
echo 'b';
151+
await(sleep(0.1));
152+
echo 'c';
153+
154+
return time();
155+
})();
156+
157+
Loop::addTimer(0.001, function () use ($promise) {
158+
$promise->cancel();
159+
});
160+
await($promise);
161+
}
162+
163+
public function testCancelNeverStartedFiber()
164+
{
165+
self::expectOutputString('');
166+
$this->expectException(\Exception::class);
167+
$this->expectExceptionMessage('Fiber has been cancelled before execution started');
168+
169+
$promise = async(function (): int {
170+
echo 'a';
171+
await(sleep(2));
172+
echo 'b';
173+
174+
return time();
175+
})();
176+
177+
$promise->cancel();
178+
await($promise);
179+
}
87180
}

0 commit comments

Comments
 (0)