-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extract receiver and sender from transport
- Loading branch information
Showing
3 changed files
with
217 additions
and
111 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Nsq\NsqBundle\Messenger; | ||
|
||
use JsonException; | ||
use Nsq\Config\ClientConfig; | ||
use Nsq\Consumer; | ||
use Nsq\Message; | ||
use Psr\Log\LoggerInterface; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; | ||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; | ||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | ||
use function Amp\delay; | ||
use function Amp\Promise\wait; | ||
use function array_pop; | ||
use function json_decode; | ||
use const JSON_THROW_ON_ERROR; | ||
|
||
final class NsqReceiver implements ReceiverInterface | ||
{ | ||
private ?Consumer $consumer = null; | ||
|
||
/** | ||
* @var Message[] | ||
*/ | ||
private array $messages = []; | ||
|
||
public function __construct( | ||
private string $address, | ||
private string $topic, | ||
private string $channel, | ||
private ClientConfig $clientConfig, | ||
private SerializerInterface $serializer, | ||
private LoggerInterface $logger, | ||
) { | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function get(): iterable | ||
{ | ||
if ([] === $this->messages) { | ||
$this->consume(); | ||
|
||
wait(delay(500)); | ||
} | ||
|
||
$message = array_pop($this->messages); | ||
|
||
if (null === $message) { | ||
return []; | ||
} | ||
|
||
try { | ||
$encodedEnvelope = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR); | ||
} catch (JsonException $e) { | ||
wait($message->finish()); | ||
|
||
throw new MessageDecodingFailedException('', 0, $e); | ||
} | ||
|
||
try { | ||
$envelope = $this->serializer->decode($encodedEnvelope); | ||
} catch (MessageDecodingFailedException $e) { | ||
wait($message->finish()); | ||
|
||
throw $e; | ||
} | ||
|
||
return [ | ||
$envelope->with( | ||
new NsqReceivedStamp($message), | ||
new TransportMessageIdStamp($message->id), | ||
), | ||
]; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function ack(Envelope $envelope): void | ||
{ | ||
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope); | ||
|
||
wait($message->finish()); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function reject(Envelope $envelope): void | ||
{ | ||
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope); | ||
|
||
wait($message->finish()); | ||
} | ||
|
||
private function consume(): void | ||
{ | ||
if (null === $this->consumer) { | ||
$this->consumer = new Consumer( | ||
$this->address, | ||
$this->topic, | ||
$this->channel, | ||
function (Message $message) { | ||
$this->messages[] = $message; | ||
}, | ||
$this->clientConfig, | ||
$this->logger, | ||
); | ||
} | ||
|
||
wait($this->consumer->connect()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Nsq\NsqBundle\Messenger; | ||
|
||
use Nsq\Config\ClientConfig; | ||
use Nsq\Producer; | ||
use Psr\Log\LoggerInterface; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Stamp\DelayStamp; | ||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | ||
use function Amp\Promise\wait; | ||
use function json_encode; | ||
use const JSON_THROW_ON_ERROR; | ||
|
||
final class NsqSender implements SenderInterface | ||
{ | ||
private ?Producer $producer = null; | ||
|
||
public function __construct( | ||
private string $address, | ||
private string $topic, | ||
private ClientConfig $clientConfig, | ||
private SerializerInterface $serializer, | ||
private LoggerInterface $logger, | ||
) { | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function send(Envelope $envelope): Envelope | ||
{ | ||
$producer = $this->getProducer(); | ||
|
||
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); | ||
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); | ||
|
||
/** @var DelayStamp|null $delayStamp */ | ||
$delayStamp = $envelope->last(DelayStamp::class); | ||
$delay = null !== $delayStamp ? $delayStamp->getDelay() : null; | ||
|
||
if (null === $delay) { | ||
$promise = $producer->publish($this->topic, $encodedMessage); | ||
} else { | ||
$promise = $producer->defer($this->topic, $encodedMessage, $delay); | ||
} | ||
|
||
wait($promise); | ||
|
||
return $envelope; | ||
} | ||
|
||
private function getProducer(): Producer | ||
{ | ||
if (null === $this->producer) { | ||
$this->producer = new Producer( | ||
$this->address, | ||
$this->clientConfig, | ||
$this->logger, | ||
); | ||
} | ||
|
||
wait($this->producer->connect()); | ||
|
||
return $this->producer; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters