Skip to content

[4.x] Consistent cancellation semantics for async() and coroutine() #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,20 @@ $promise->then(function (int $bytes) {
});
```

Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
of the example.
The returned promise is implemented in such a way that it can be cancelled
when it is still pending. Cancelling a pending promise will cancel any awaited
promises inside that fiber or any nested fibers. As such, the following example
will only output `ab` and cancel the pending [`sleep()`](https://reactphp.org/promise-timer/#sleep).
The [`await()`](#await) calls in this example would throw a `RuntimeException`
from the cancelled [`sleep()`](https://reactphp.org/promise-timer/#sleep) call
that bubbles up through the fibers.

```php
$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(sleep(2));
await(React\Promise\Timer\sleep(2));
echo 'c';
})());
echo 'd';
Expand Down
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3",
"react/promise-timer": "^1.8"
"phpunit/phpunit": "^9.3"
},
"autoload": {
"psr-4": {
Expand Down
10 changes: 0 additions & 10 deletions src/FiberMap.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ public static function cancel(\Fiber $fiber): void
self::$status[\spl_object_id($fiber)] = true;
}

public static function isCancelled(\Fiber $fiber): bool
{
return self::$status[\spl_object_id($fiber)] ?? false;
}

public static function setPromise(\Fiber $fiber, PromiseInterface $promise): void
{
self::$map[\spl_object_id($fiber)] = $promise;
Expand All @@ -38,11 +33,6 @@ public static function unsetPromise(\Fiber $fiber, PromiseInterface $promise): v
unset(self::$map[\spl_object_id($fiber)]);
}

public static function has(\Fiber $fiber): bool
{
return array_key_exists(\spl_object_id($fiber), self::$map);
}

public static function getPromise(\Fiber $fiber): ?PromiseInterface
{
return self::$map[\spl_object_id($fiber)] ?? null;
Expand Down
28 changes: 11 additions & 17 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,20 @@
* });
* ```
*
* Promises returned by `async()` can be cancelled, and when done any currently
* and future awaited promise inside that and any nested fibers with their
* awaited promises will also be cancelled. As such the following example will
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
* through the fibers ultimately to the end user through the [`await()`](#await)
* on the last line of the example.
* The returned promise is implemented in such a way that it can be cancelled
* when it is still pending. Cancelling a pending promise will cancel any awaited
* promises inside that fiber or any nested fibers. As such, the following example
* will only output `ab` and cancel the pending [`sleep()`](https://reactphp.org/promise-timer/#sleep).
* The [`await()`](#await) calls in this example would throw a `RuntimeException`
* from the cancelled [`sleep()`](https://reactphp.org/promise-timer/#sleep) call
* that bubbles up through the fibers.
*
* ```php
* $promise = async(static function (): int {
* echo 'a';
* await(async(static function(): void {
* echo 'b';
* await(sleep(2));
* await(React\Promise\Timer\sleep(2));
* echo 'c';
* })());
* echo 'd';
Expand Down Expand Up @@ -277,10 +277,6 @@ function await(PromiseInterface $promise): mixed
$rejectedThrowable = null;
$lowLevelFiber = \Fiber::getCurrent();

if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}

$promise->then(
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void {
if ($lowLevelFiber !== null) {
Expand Down Expand Up @@ -485,12 +481,10 @@ function coroutine(callable $function, mixed ...$args): PromiseInterface

$promise = null;
$deferred = new Deferred(function () use (&$promise) {
// cancel pending promise(s) as long as generator function keeps yielding
while ($promise instanceof CancellablePromiseInterface) {
$temp = $promise;
$promise = null;
$temp->cancel();
if ($promise instanceof PromiseInterface && \method_exists($promise, 'cancel')) {
$promise->cancel();
}
$promise = null;
});

/** @var callable $next */
Expand Down
106 changes: 39 additions & 67 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use function React\Promise\all;
use function React\Promise\reject;
use function React\Promise\resolve;
use function React\Promise\Timer\sleep;

class AsyncTest extends TestCase
{
Expand Down Expand Up @@ -187,57 +186,70 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
$this->assertLessThan(0.12, $time);
}

public function testCancel()
public function testCancelAsyncWillReturnRejectedPromiseWhenCancellingPendingPromiseRejects()
{
self::expectOutputString('a');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');
$promise = async(function () {
await(new Promise(function () { }, function () {
throw new \RuntimeException('Operation cancelled');
}));
})();

$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';
$promise->cancel();

return time();
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Operation cancelled')));
}

public function testCancelAsyncWillReturnFulfilledPromiseWhenCancellingPendingPromiseRejectsInsideCatchThatReturnsValue()
{
$promise = async(function () {
try {
await(new Promise(function () { }, function () {
throw new \RuntimeException('Operation cancelled');
}));
} catch (\RuntimeException $e) {
return 42;
}
})();

$promise->cancel();
await($promise);

$promise->then($this->expectCallableOnceWith(42));
}

public function testCancelTryCatch()
public function testCancelAsycWillReturnPendigPromiseWhenCancellingFirstPromiseRejectsInsideCatchThatAwaitsSecondPromise()
{
self::expectOutputString('ab');

$promise = async(static function (): int {
echo 'a';
$promise = async(function () {
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
await(new Promise(function () { }, function () {
throw new \RuntimeException('First operation cancelled');
}));
} catch (\RuntimeException $e) {
await(new Promise(function () { }, function () {
throw new \RuntimeException('Second operation never cancelled');
}));
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testNestedCancel()
public function testCancelAsyncWillCancelNestedAwait()
{
self::expectOutputString('abc');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');
$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('Operation cancelled');

$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
await(new Promise(function () { }, function () {
throw new \RuntimeException('Operation cancelled');
}));
echo 'd';
})());
echo 'e';
Expand All @@ -250,44 +262,4 @@ public function testNestedCancel()
$promise->cancel();
await($promise);
}

public function testCancelFiberThatCatchesExceptions()
{
self::expectOutputString('ab');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNotAwaitedPromiseWillNotBeCanceled()
{
self::expectOutputString('acb');

async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})()->cancel();
Loop::run();
}
}
4 changes: 3 additions & 1 deletion tests/AwaitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ public function testResolvedPromisesShouldBeDetached(callable $await)
{
$await(async(function () use ($await): int {
$fiber = \Fiber::getCurrent();
$await(React\Promise\Timer\sleep(0.01));
$await(new Promise(function ($resolve) {
Loop::addTimer(0.01, fn() => $resolve(null));
}));
$this->assertNull(React\Async\FiberMap::getPromise($fiber));

return time();
Expand Down
47 changes: 29 additions & 18 deletions tests/CoroutineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,42 +106,53 @@ public function testCoroutineReturnsRejectedPromiseIfFunctionYieldsInvalidValue(
$promise->then(null, $this->expectCallableOnceWith(new \UnexpectedValueException('Expected coroutine to yield React\Promise\PromiseInterface, but got integer')));
}


public function testCoroutineWillCancelPendingPromiseWhenCallingCancelOnResultingPromise()
public function testCancelCoroutineWillReturnRejectedPromiseWhenCancellingPendingPromiseRejects()
{
$cancelled = 0;
$promise = coroutine(function () use (&$cancelled) {
yield new Promise(function () use (&$cancelled) {
++$cancelled;
$promise = coroutine(function () {
yield new Promise(function () { }, function () {
throw new \RuntimeException('Operation cancelled');
});
});

$promise->cancel();

$this->assertEquals(1, $cancelled);
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Operation cancelled')));
}

public function testCoroutineWillCancelAllPendingPromisesWhenFunctionContinuesToYieldWhenCallingCancelOnResultingPromise()
public function testCancelCoroutineWillReturnFulfilledPromiseWhenCancellingPendingPromiseRejectsInsideCatchThatReturnsValue()
{
$promise = coroutine(function () {
$promise = new Promise(function () { }, function () {
throw new \RuntimeException('Frist operation cancelled', 21);
});

try {
yield $promise;
yield new Promise(function () { }, function () {
throw new \RuntimeException('Operation cancelled');
});
} catch (\RuntimeException $e) {
// ignore exception and continue
return 42;
}
});

yield new Promise(function () { }, function () {
throw new \RuntimeException('Second operation cancelled', 42);
});
$promise->cancel();

$promise->then($this->expectCallableOnceWith(42));
}

public function testCancelCoroutineWillReturnPendigPromiseWhenCancellingFirstPromiseRejectsInsideCatchThatYieldsSecondPromise()
{
$promise = coroutine(function () {
try {
yield new Promise(function () { }, function () {
throw new \RuntimeException('First operation cancelled');
});
} catch (\RuntimeException $e) {
yield new Promise(function () { }, function () {
throw new \RuntimeException('Second operation never cancelled');
});
}
});

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Second operation cancelled', 42)));
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testCoroutineShouldNotCreateAnyGarbageReferencesWhenGeneratorReturns()
Expand Down