diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..50a44e1
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,53 @@
+name: CI
+
+on:
+ push:
+ pull_request:
+ workflow_dispatch:
+
+jobs:
+ tests:
+ name: PHP ${{ matrix.php }}
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ php: ['8.1', '8.2', '8.3']
+
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Setup PHP
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: ${{ matrix.php }}
+ coverage: none
+ tools: composer:v2
+
+ - name: Install dependencies
+ run: composer install --prefer-dist --no-progress
+
+ - name: Run tests
+ run: vendor/bin/phpunit
+
+ - name: Static Analysis
+ run: vendor/bin/phpstan analyse
+
+ coding-standards:
+ name: Coding Standards
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Setup PHP
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: '8.1'
+ coverage: none
+ tools: composer:v2, php-cs-fixer
+
+ - name: Install dependencies
+ run: composer install --prefer-dist --no-progress
+
+ - name: Check coding standards
+ run: php-cs-fixer fix --dry-run --diff
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..eaea388
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+/vendor/
+/composer.lock
+/.phpunit.cache/
+/.phpunit.result.cache
+/.php-cs-cache
+/phpstan.cache
+.idea/
+.vscode/
+*.swp
+*.swo
+.DS_Store
+.php-cs-fixer.cache
\ No newline at end of file
diff --git a/.gitkeep b/.gitkeep
deleted file mode 100644
index e69de29..0000000
diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php
new file mode 100644
index 0000000..918fbb2
--- /dev/null
+++ b/.php-cs-fixer.dist.php
@@ -0,0 +1,23 @@
+in([
+ __DIR__ . '/src',
+ __DIR__ . '/tests',
+ __DIR__ . '/examples',
+ ])
+ ->name('*.php')
+ ->ignoreDotFiles(true)
+ ->ignoreVCS(true);
+
+return (new PhpCsFixer\Config())
+ ->setRules([
+ '@PSR12' => true,
+ 'array_syntax' => ['syntax' => 'short'],
+ 'ordered_imports' => ['sort_algorithm' => 'alpha'],
+ 'no_unused_imports' => true,
+ 'no_extra_blank_lines' => true,
+ 'single_quote' => true,
+ 'trailing_comma_in_multiline' => true,
+ ])
+ ->setFinder($finder);
\ No newline at end of file
diff --git a/LICENSE.md b/LICENSE.md
new file mode 100644
index 0000000..164725a
--- /dev/null
+++ b/LICENSE.md
@@ -0,0 +1,29 @@
+BSD 3-Clause License
+
+Copyright (c) 2024, ArchiPro
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..bace6a6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,19 @@
+# PSR-14 Async Event Dispatcher (Experimental)
+
+[![CI](https://github.com/archiprocode/revolt-event-dispatcher/actions/workflows/ci.yml/badge.svg)](https://github.com/archiprocode/revolt-event-dispatcher/actions/workflows/ci.yml)
+
+A PSR-14 compatible event dispatcher implementation using Revolt and AMPHP for asynchronous event handling.
+
+## Features
+
+- Full PSR-14 compatibility
+- Asynchronous event dispatching using Revolt's event loop
+- Support for stoppable and non-stoppable events
+- Fire-and-forget event dispatching
+- Type-safe event handling
+
+## Installation
+
+```bash
+composer require archipro/revolt-event-dispatcher
+```
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..7ccaa42
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,47 @@
+{
+ "name": "archipro/revolt-event-dispatcher",
+ "description": "PSR-14 Event Dispatcher implementation using Revolt and AMPHP",
+ "authors": [
+ {
+ "name": "ArchiPro",
+ "email": "developers@archipro.co.nz"
+ }
+ ],
+ "type": "library",
+ "require": {
+ "php": "^8.1",
+ "psr/event-dispatcher": "^1.0",
+ "revolt/event-loop": "^1.0",
+ "amphp/amp": "^3.0"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "^10.0",
+ "phpstan/phpstan": "^1.10",
+ "friendsofphp/php-cs-fixer": "^3.14"
+ },
+ "autoload": {
+ "psr-4": {
+ "ArchiPro\\EventDispatcher\\": "src/"
+ }
+ },
+ "autoload-dev": {
+ "psr-4": {
+ "ArchiPro\\EventDispatcher\\Tests\\": "tests/"
+ }
+ },
+ "scripts": {
+ "test": "phpunit",
+ "analyse": "phpstan analyse",
+ "cs-check": "php-cs-fixer fix --dry-run --diff",
+ "cs-fix": "php-cs-fixer fix",
+ "check": [
+ "@cs-check",
+ "@analyse",
+ "@test"
+ ]
+ },
+ "license": "BSD-3-Clause",
+ "provide": {
+ "psr/event-dispatcher-implementation": "1.0"
+ }
+}
diff --git a/examples/basic-usage.php b/examples/basic-usage.php
new file mode 100644
index 0000000..31de479
--- /dev/null
+++ b/examples/basic-usage.php
@@ -0,0 +1,62 @@
+addListener(UserCreatedEvent::class, function (UserCreatedEvent $event) {
+ // Simulate async operation
+ EventLoop::delay(
+ 1,
+ function () use ($event) {
+ echo "Sending welcome email to {$event->email}\n";
+ }
+ );
+});
+
+$listenerProvider->addListener(UserCreatedEvent::class, function (UserCreatedEvent $event) {
+ // Simulate async operation
+ EventLoop::delay(
+ 0.5,
+ function () use ($event) {
+ echo "Logging user creation: {$event->userId}\n";
+ }
+ );
+});
+
+// Create the event dispatcher
+$dispatcher = new AsyncEventDispatcher($listenerProvider);
+
+// Dispatch an event
+$event = new UserCreatedEvent('123', 'user@example.com');
+$dispatcher->dispatch($event);
+
+// Run the event loop to process all events
+EventLoop::run();
+
+// Wait for the event to finish right away
+$event = new UserCreatedEvent('456', 'user@example.com');
+$future = $dispatcher->dispatch($event);
+$updatedEvent = $future->await();
+
+// Make an event cancellable
+$event = new UserCreatedEvent('789', 'user@example.com');
+$future = $dispatcher->dispatch($event, new TimeoutCancellation(30));
+EventLoop::run();
diff --git a/phpstan.neon b/phpstan.neon
new file mode 100644
index 0000000..5c602a2
--- /dev/null
+++ b/phpstan.neon
@@ -0,0 +1,7 @@
+parameters:
+ level: 8
+ paths:
+ - src
+ - tests
+ - examples
+ tmpDir: phpstan.cache
\ No newline at end of file
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
new file mode 100644
index 0000000..00ffdd4
--- /dev/null
+++ b/phpunit.xml.dist
@@ -0,0 +1,24 @@
+
+
+
+
+ tests
+
+
+
+
+
\ No newline at end of file
diff --git a/src/AsyncEventDispatcher.php b/src/AsyncEventDispatcher.php
new file mode 100644
index 0000000..a4d1d54
--- /dev/null
+++ b/src/AsyncEventDispatcher.php
@@ -0,0 +1,133 @@
+ Future that resolves with the dispatched event
+ */
+ public function dispatch(object $event, ?Cancellation $cancellation = null): Future
+ {
+ $listeners = $this->listenerProvider->getListenersForEvent($event);
+
+ if ($event instanceof StoppableEventInterface) {
+ return $this->dispatchStoppableEvent(
+ $event,
+ $listeners,
+ $cancellation ?: new NullCancellation()
+ );
+ }
+
+ return $this->dispatchNonStoppableEvent(
+ $event,
+ $listeners,
+ $cancellation ?: new NullCancellation()
+ );
+ }
+
+ /**
+ * Dispatches a stoppable event to listeners asynchronously.
+ * Uses a queue to handle propagation stopping.
+ *
+ * @template T of StoppableEventInterface
+ * @param T $event
+ * @param iterable $listeners
+ * @return Future
+ */
+ private function dispatchStoppableEvent(
+ StoppableEventInterface $event,
+ iterable $listeners,
+ Cancellation $cancellation
+ ): Future {
+ return async(function () use ($event, $listeners, $cancellation): StoppableEventInterface {
+ // We'll process each listener in sequence so that if one decides to stop propagation,
+ // we have chance to kill the following listeners.
+ foreach ($listeners as $listener) {
+ // We'll wrap our listener in a `async` call. Even if we want to block the next listener in the loop,
+ // that doesn't mean we want to block other listeners outside this loop.
+ $future = async(function () use ($event, $listener) {
+ $listener($event);
+ });
+
+ $future->await($cancellation);
+
+ // If one of our listeners decides to stop propagation, we'll break out of the loop.
+ if ($event->isPropagationStopped()) {
+ break;
+ }
+ }
+ return $event;
+ });
+ }
+
+ /**
+ * Dispatches a non-stoppable event to listeners asynchronously.
+ * Simply queues all listeners in the event loop.
+ *
+ * Because we don't need to worry about stopping propagation, we can simply
+ * queue all listeners in the event loop and let them run whenever in any order.
+ *
+ * @template T of object
+ * @param T $event
+ * @param iterable $listeners
+ * @return Future
+ */
+ private function dispatchNonStoppableEvent(
+ object $event,
+ iterable $listeners,
+ Cancellation $cancellation
+ ): Future {
+ return async(function () use ($event, $listeners, $cancellation): object {
+ $futures = [];
+ foreach ($listeners as $listener) {
+ $futures[] = async(function () use ($event, $listener) {
+ $listener($event);
+ });
+ }
+
+ // Wait for all listeners to complete
+ awaitAll($futures, $cancellation);
+
+ return $event;
+ });
+ }
+
+}
diff --git a/src/Event/AbstractStoppableEvent.php b/src/Event/AbstractStoppableEvent.php
new file mode 100644
index 0000000..b0bfbc7
--- /dev/null
+++ b/src/Event/AbstractStoppableEvent.php
@@ -0,0 +1,36 @@
+propagationStopped;
+ }
+
+ /**
+ * Stops the propagation of the event to subsequent listeners.
+ */
+ public function stopPropagation(): void
+ {
+ $this->propagationStopped = true;
+ }
+}
diff --git a/src/ListenerProvider.php b/src/ListenerProvider.php
new file mode 100644
index 0000000..69863f6
--- /dev/null
+++ b/src/ListenerProvider.php
@@ -0,0 +1,45 @@
+>
+ */
+ private array $listeners = [];
+
+ /**
+ * Registers a listener for a specific event class.
+ *
+ * @template T of object
+ * @param class-string $eventClass The fully qualified class name of the event
+ * @param callable(T): void $listener The listener callback that will handle the event
+ */
+ public function addListener(string $eventClass, callable $listener): void
+ {
+ $this->listeners[$eventClass][] = $listener;
+ }
+
+ /**
+ * Gets all listeners registered for the given event.
+ *
+ * @template T of object
+ * @param T $event The event to get listeners for
+ * @return array The registered listeners
+ */
+ public function getListenersForEvent(object $event): iterable
+ {
+ return $this->listeners[$event::class] ?? [];
+ }
+}
diff --git a/tests/AsyncEventDispatcherTest.php b/tests/AsyncEventDispatcherTest.php
new file mode 100644
index 0000000..6e4b7a8
--- /dev/null
+++ b/tests/AsyncEventDispatcherTest.php
@@ -0,0 +1,211 @@
+listenerProvider = new ListenerProvider();
+ $this->dispatcher = new AsyncEventDispatcher($this->listenerProvider);
+
+ EventLoop::setErrorHandler(function (Throwable $err) {
+ throw $err;
+ });
+
+ }
+
+ /**
+ * Tests that multiple listeners for an event are executed.
+ */
+ public function testDispatchEventToMultipleListeners(): void
+ {
+ $results = [];
+ $completed = false;
+
+ $this->listenerProvider->addListener(TestEvent::class, function (TestEvent $event) use (&$results) {
+ delay(0.1);
+ $results[] = 'listener1: ' . $event->data;
+ });
+
+ $this->listenerProvider->addListener(TestEvent::class, function (TestEvent $event) use (&$results, &$completed) {
+ delay(0.05);
+ $results[] = 'listener2: ' . $event->data;
+ $completed = true;
+ });
+
+ $event = new TestEvent('test data');
+ $futureEvent = $this->dispatcher->dispatch($event);
+
+ // Verify immediate return
+ $this->assertEmpty($results);
+
+ // Run the event loop until listeners complete
+ $futureEvent->await();
+
+ $this->assertTrue($completed);
+ $this->assertCount(2, $results);
+ $this->assertContains('listener1: test data', $results);
+ $this->assertContains('listener2: test data', $results);
+ }
+
+ /**
+ * Tests that event propagation can be stopped synchronously.
+ */
+ public function testSynchronousStoppableEvent(): void
+ {
+ $results = [];
+
+ $this->listenerProvider->addListener(TestEvent::class, function (TestEvent $event) use (&$results) {
+ $results[] = 'listener1';
+ $event->stopPropagation();
+ });
+
+ $this->listenerProvider->addListener(TestEvent::class, function (TestEvent $event) use (&$results) {
+ $results[] = 'listener2';
+ });
+
+ $event = new TestEvent('test data');
+ $this->dispatcher->dispatch($event)->await();
+
+ $this->assertCount(1, $results);
+ $this->assertEquals(['listener1'], $results);
+ }
+
+ /**
+ * Tests handling of events with no registered listeners.
+ */
+ public function testNoListenersForEvent(): void
+ {
+ $event = new TestEvent('test data');
+ $dispatchedEvent = $this->dispatcher->dispatch($event);
+
+ $this->assertSame($event, $dispatchedEvent->await());
+ }
+
+ /**
+ * @test
+ */
+ public function testDispatchesNonStoppableEvents(): void
+ {
+ $event = new class () {
+ public bool $called = false;
+ };
+
+ $listener = function ($event) {
+ $event->called = true;
+ };
+
+ $this->listenerProvider->addListener(get_class($event), $listener);
+ $futureEvent = $this->dispatcher->dispatch($event);
+
+ $this->assertFalse($event->called, 'Listener should not have been called right away');
+
+ $futureEvent->await();
+
+ $this->assertTrue($event->called, 'Listener should have been called for non-stoppable event');
+ }
+
+ public function testDispatchesFailureInOneListenerDoesNotAffectOthers(): void
+ {
+ $event = new class () {
+ public bool $calledOnce = false;
+ public bool $calledTwice = false;
+ };
+
+ $this->listenerProvider->addListener(get_class($event), function ($event) {
+ $event->calledOnce = true;
+ throw new Exception('Test exception');
+ });
+
+ $this->listenerProvider->addListener(get_class($event), function ($event) {
+ $event->calledTwice = true;
+ throw new Exception('Test exception');
+ });
+
+ $futureEvent = $this->dispatcher->dispatch($event);
+
+ $futureEvent = $futureEvent->await();
+
+ $this->assertTrue(
+ $futureEvent->calledOnce,
+ 'The first listener should have been called'
+ );
+ $this->assertTrue(
+ $futureEvent->calledTwice,
+ 'The second listener should have been called despite the failure of the first listener'
+ );
+ }
+
+ public function testCancellationOfStoppableEvent(): void
+ {
+ $event = new class () extends AbstractStoppableEvent {
+ public bool $called = false;
+ };
+
+ $this->listenerProvider->addListener(get_class($event), function ($event) {
+ // Simulate a long-running operation
+ delay(0.1);
+ $event->called = true;
+ });
+
+ $cancellation = new TimeoutCancellation(0.05);
+
+ $this->expectException(CancelledException::class);
+
+ $this->dispatcher->dispatch($event, $cancellation)->await();
+ }
+
+ public function testCancellationOfNonStoppableEvent(): void
+ {
+ $event = new class () {
+ public bool $called = false;
+ };
+
+ $this->listenerProvider->addListener(get_class($event), function ($event) {
+ // Simulate a long-running operation
+ delay(0.1);
+ $event->called = true;
+ });
+
+ $cancellation = new TimeoutCancellation(0.05);
+
+ $this->expectException(CancelledException::class);
+
+ $this->dispatcher->dispatch($event, $cancellation)->await();
+ }
+
+}
diff --git a/tests/Fixture/TestEvent.php b/tests/Fixture/TestEvent.php
new file mode 100644
index 0000000..3ab16cd
--- /dev/null
+++ b/tests/Fixture/TestEvent.php
@@ -0,0 +1,24 @@
+