Skip to content

Commit 3acd81c

Browse files
committed
[async-event] Move async event classes to own package.
1 parent f2c5e7c commit 3acd81c

18 files changed

+598
-69
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/phpunit.xml
55
/vendor/
66
/.idea/
7+
Tests/Functional/queues

ProxyEventDispatcher.php renamed to AsyncEventDispatcher.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use Symfony\Component\EventDispatcher\EventDispatcher;
77
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
88

9-
class ProxyEventDispatcher extends EventDispatcher
9+
class AsyncEventDispatcher extends EventDispatcher
1010
{
1111
/**
1212
* @var EventDispatcherInterface

AsyncListener.php

+26-10
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,42 @@
22

33
namespace Enqueue\AsyncEventDispatcher;
44

5-
use Enqueue\Client\Message;
6-
use Enqueue\Client\ProducerInterface;
5+
use Enqueue\Psr\PsrContext;
6+
use Enqueue\Psr\PsrQueue;
77
use Symfony\Component\EventDispatcher\Event;
88

99
class AsyncListener
1010
{
1111
/**
12-
* @var ProducerInterface
12+
* @var PsrContext
1313
*/
14-
private $producer;
14+
private $context;
1515

1616
/**
1717
* @var Registry
1818
*/
1919
private $registry;
2020

21+
/**
22+
* @var PsrQueue
23+
*/
24+
private $eventQueue;
25+
2126
/**
2227
* @var bool
2328
*/
2429
private $syncMode;
2530

2631
/**
27-
* @param ProducerInterface $producer
28-
* @param Registry $registry
32+
* @param PsrContext $context
33+
* @param Registry $registry
34+
* @param PsrQueue|string $eventQueue
2935
*/
30-
public function __construct(ProducerInterface $producer, Registry $registry)
36+
public function __construct(PsrContext $context, Registry $registry, $eventQueue)
3137
{
32-
$this->producer = $producer;
38+
$this->context = $context;
3339
$this->registry = $registry;
40+
$this->eventQueue = $eventQueue instanceof PsrQueue ? $eventQueue : $context->createQueue($eventQueue);
3441
}
3542

3643
public function resetSyncMode()
@@ -46,6 +53,16 @@ public function syncMode($eventName)
4653
$this->syncMode[$eventName] = true;
4754
}
4855

56+
/**
57+
* @param string $eventName
58+
*
59+
* @return bool
60+
*/
61+
public function isSyncMode($eventName)
62+
{
63+
return isset($this->syncMode[$eventName]);
64+
}
65+
4966
/**
5067
* @param Event $event
5168
* @param string $eventName
@@ -56,11 +73,10 @@ public function onEvent(Event $event = null, $eventName)
5673
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
5774

5875
$message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
59-
$message->setScope(Message::SCOPE_APP);
6076
$message->setProperty('event_name', $eventName);
6177
$message->setProperty('transformer_name', $transformerName);
6278

63-
$this->producer->sendEvent('event.'.$eventName, $message);
79+
$this->context->createProducer()->send($this->eventQueue, $message);
6480
}
6581
}
6682
}

AsyncProcessor.php

+18-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Psr\PsrContext;
77
use Enqueue\Psr\PsrMessage;
88
use Enqueue\Psr\PsrProcessor;
9+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
910

1011
class AsyncProcessor implements PsrProcessor
1112
{
@@ -15,18 +16,28 @@ class AsyncProcessor implements PsrProcessor
1516
private $registry;
1617

1718
/**
18-
* @var ProxyEventDispatcher
19+
* @var AsyncEventDispatcher|OldAsyncEventDispatcher
1920
*/
20-
private $eventDispatcher;
21+
private $dispatcher;
2122

2223
/**
23-
* @param Registry $registry
24-
* @param ProxyEventDispatcher $eventDispatcher
24+
* @param Registry $registry
25+
* @param EventDispatcherInterface $dispatcher
2526
*/
26-
public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher)
27+
public function __construct(Registry $registry, EventDispatcherInterface $dispatcher)
2728
{
2829
$this->registry = $registry;
29-
$this->eventDispatcher = $eventDispatcher;
30+
31+
if (false == ($dispatcher instanceof AsyncEventDispatcher || $dispatcher instanceof OldAsyncEventDispatcher)) {
32+
throw new \InvalidArgumentException(sprintf(
33+
'The dispatcher argument must be either instance of "%s" or "%s" but got "%s"',
34+
AsyncEventDispatcher::class,
35+
OldAsyncEventDispatcher::class,
36+
get_class($dispatcher)
37+
));
38+
}
39+
40+
$this->dispatcher = $dispatcher;
3041
}
3142

3243
/**
@@ -43,7 +54,7 @@ public function process(PsrMessage $message, PsrContext $context)
4354

4455
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
4556

46-
$this->eventDispatcher->dispatchAsyncListenersOnly($eventName, $event);
57+
$this->dispatcher->dispatchAsyncListenersOnly($eventName, $event);
4758

4859
return self::ACK;
4960
}

DependencyInjection/AsyncEventDispatcherExtension.php

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\AsyncEventDispatcher\DependencyInjection;
44

5-
use Enqueue\AsyncEventDispatcher\OldProxyEventDispatcher;
5+
use Enqueue\AsyncEventDispatcher\OldAsyncEventDispatcher;
66
use Symfony\Component\Config\FileLocator;
77
use Symfony\Component\DependencyInjection\ContainerBuilder;
88
use Symfony\Component\DependencyInjection\Definition;
@@ -18,11 +18,15 @@ class AsyncEventDispatcherExtension extends Extension
1818
*/
1919
public function load(array $configs, ContainerBuilder $container)
2020
{
21+
$config = $this->processConfiguration(new Configuration(), $configs);
22+
23+
$container->setAlias('enqueue.events.context', $config['context_service']);
24+
2125
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
2226
$loader->load('services.yml');
2327

2428
if (version_compare(Kernel::VERSION, '3.3', '<')) {
25-
$container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldProxyEventDispatcher::class, [
29+
$container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldAsyncEventDispatcher::class, [
2630
new Reference('service_container'),
2731
new Reference('event_dispatcher'),
2832
new Reference('enqueue.events.async_listener'),

DependencyInjection/Configuration.php

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher\DependencyInjection;
4+
5+
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
6+
use Symfony\Component\Config\Definition\ConfigurationInterface;
7+
8+
class Configuration implements ConfigurationInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function getConfigTreeBuilder()
14+
{
15+
$tb = new TreeBuilder();
16+
$rootNode = $tb->root('enqueue_async_event_dispatcher');
17+
18+
$rootNode->children()
19+
->scalarNode('context_service')->isRequired()->cannotBeEmpty()->end()
20+
;
21+
22+
return $tb;
23+
}
24+
}

EventTransformer.php

+5-6
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
namespace Enqueue\AsyncEventDispatcher;
44

5-
use Enqueue\Client\Message;
6-
use Enqueue\Consumption\Result;
75
use Enqueue\Psr\PsrMessage;
86
use Symfony\Component\EventDispatcher\Event;
97

@@ -13,18 +11,19 @@ interface EventTransformer
1311
* @param string $eventName
1412
* @param Event|null $event
1513
*
16-
* @return Message
14+
* @return PsrMessage
1715
*/
18-
public function toMessage($eventName, Event $event = null);
16+
public function toMessage($eventName, Event $event);
1917

2018
/**
2119
* If you able to transform message back to event return it.
22-
* If you failed to transform for some reason you can return instance of Result object ( Like this Result::reject() );.
20+
* If you failed to transform for some reason you can return a string status (@see PsrProcess constants) or an object that implements __toString method.
21+
* The object must have a __toString method is supposed to be used as PsrProcessor::process return value.
2322
*
2423
* @param string $eventName
2524
* @param PsrMessage $message
2625
*
27-
* @return Event|Result|null
26+
* @return Event|string|object
2827
*/
2928
public function toEvent($eventName, PsrMessage $message);
3029
}

OldProxyEventDispatcher.php renamed to OldAsyncEventDispatcher.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use Symfony\Component\EventDispatcher\Event;
88
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
99

10-
class OldProxyEventDispatcher extends ContainerAwareEventDispatcher
10+
class OldAsyncEventDispatcher extends ContainerAwareEventDispatcher
1111
{
1212
/**
1313
* @var EventDispatcherInterface

PhpSerializerEventTransformer.php

+26-5
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,41 @@
22

33
namespace Enqueue\AsyncEventDispatcher;
44

5-
use Enqueue\Client\Message;
5+
use Enqueue\Psr\PsrContext;
66
use Enqueue\Psr\PsrMessage;
77
use Symfony\Component\EventDispatcher\Event;
88
use Symfony\Component\HttpKernel\Kernel;
99

1010
class PhpSerializerEventTransformer implements EventTransformer
1111
{
12+
/**
13+
* @var PsrContext
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var bool
19+
*/
20+
private $skipSymfonyVersionCheck;
21+
22+
/**
23+
* @param PsrContext $context
24+
* @param bool $skipSymfonyVersionCheck It is useful when async dispatcher is used without Kernel. So there is no way to check the version.
25+
*/
26+
public function __construct(PsrContext $context, $skipSymfonyVersionCheck = false)
27+
{
28+
$this->context = $context;
29+
$this->skipSymfonyVersionCheck = $skipSymfonyVersionCheck;
30+
}
31+
1232
/**
1333
* {@inheritdoc}
1434
*/
1535
public function toMessage($eventName, Event $event = null)
1636
{
1737
$this->assertSymfony30OrHigher();
1838

19-
$message = new Message();
20-
$message->setBody(serialize($event));
21-
22-
return $message;
39+
return $this->context->createMessage(serialize($event));
2340
}
2441

2542
/**
@@ -34,6 +51,10 @@ public function toEvent($eventName, PsrMessage $message)
3451

3552
private function assertSymfony30OrHigher()
3653
{
54+
if ($this->skipSymfonyVersionCheck) {
55+
return;
56+
}
57+
3758
if (version_compare(Kernel::VERSION, '3.0', '<')) {
3859
throw new \LogicException(
3960
'This transformer does not work on Symfony prior 3.0. '.

Resources/config/services.yml

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1+
parameters:
2+
enqueue_events_queue: 'symfony_events'
3+
14
services:
5+
# should be defined by the extension
6+
# enqueue.events.context:
7+
28
enqueue.events.registry:
39
class: 'Enqueue\AsyncEventDispatcher\ContainerAwareRegistry'
410
public: false
@@ -8,11 +14,11 @@ services:
814

915
enqueue.events.async_listener:
1016
class: 'Enqueue\AsyncEventDispatcher\AsyncListener'
11-
arguments: ['@enqueue.client.producer', '@enqueue.events.registry']
17+
arguments: ['@enqueue.events.context', '@enqueue.events.registry', '%enqueue_events_queue%']
1218

1319

1420
enqueue.events.event_dispatcher:
15-
class: 'Enqueue\AsyncEventDispatcher\ProxyEventDispatcher'
21+
class: 'Enqueue\AsyncEventDispatcher\AsyncEventDispatcher'
1622
arguments:
1723
- '@event_dispatcher'
1824
- '@enqueue.events.async_listener'
@@ -22,8 +28,18 @@ services:
2228
arguments:
2329
- '@enqueue.events.registry'
2430
- '@enqueue.events.event_dispatcher'
31+
tags:
32+
-
33+
name: 'enqueue.client.processor'
34+
topicName: '__command__'
35+
processorName: '%enqueue_events_queue%'
36+
queueName: '%enqueue_events_queue%'
37+
queueNameHardcoded: true
38+
exclusive: true
2539

2640
enqueue.events.php_serializer_event_transofrmer:
2741
class: 'Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer'
42+
arguments:
43+
- '@enqueue.events.context'
2844
tags:
29-
- {name: 'enqueue.event_transformer', eventName: '/.*/', transformerName: 'php_serializer' }
45+
- {name: 'enqueue.event_transformer', eventName: '/.*/', transformerName: 'php_serializer' }

0 commit comments

Comments
 (0)