diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index d43453a..6d76cd4 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -5,7 +5,6 @@ namespace Webgriffe\Esb; use Amp\Beanstalk\BeanstalkClient; -use function Amp\call; use Amp\Loop; use Amp\Promise; use Psr\Log\LoggerInterface; @@ -17,6 +16,7 @@ use Webgriffe\Esb\Service\HttpProducersServer; use Webgriffe\Esb\Service\ProducerQueueManagerInterface; use Webgriffe\Esb\Service\QueueManager; +use function Amp\call; final class ProducerInstance implements ProducerInstanceInterface { @@ -166,7 +166,6 @@ public function produceAndQueueJobs($data = null): Promise return call(function () use ($data) { $jobsCount = 0; $job = null; - $test = false; try { $jobs = $this->producer->produce($data); while (yield $jobs->advance()) { @@ -192,7 +191,6 @@ public function produceAndQueueJobs($data = null): Promise 'producer' => \get_class($this->producer), 'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null, 'error' => $error->getMessage(), - 'test' => $test ] ); } diff --git a/src/Service/HttpProducersServer.php b/src/Service/HttpProducersServer.php index 36131ef..f3bdbfb 100644 --- a/src/Service/HttpProducersServer.php +++ b/src/Service/HttpProducersServer.php @@ -107,40 +107,46 @@ public function addProducerInstance(ProducerInstance $producerInstance): void */ private function requestHandler(Request $request) { - $producerInstance = $this->matchProducerInstance($request); - if (!$producerInstance) { - return new Response(Status::NOT_FOUND, [], 'Producer Not Found'); + $producerInstances = $this->matchProducerInstance($request); + + if (count($producerInstances) === 0) { + return new Response(Status::NOT_FOUND, [], 'No Producers Found'); + } + + $jobsCount = 0; + foreach ($producerInstances as $producerInstance) { + $this->logger->info( + 'Matched an HTTP Producer for an incoming HTTP request.', + [ + 'producer' => \get_class($producerInstance->getProducer()), + 'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri()) + ] + ); + $jobsCount += yield $producerInstance->produceAndQueueJobs($request); } - $this->logger->info( - 'Matched an HTTP Producer for an incoming HTTP request.', - [ - 'producer' => \get_class($producerInstance->getProducer()), - 'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri()) - ] - ); - $jobsCount = yield $producerInstance->produceAndQueueJobs($request); $responseMessage = sprintf('Successfully scheduled %s job(s) to be queued.', $jobsCount); return new Response(Status::OK, [], sprintf('"%s"', $responseMessage)); } /** * @param Request $request - * @return false|ProducerInstance + * @return ProducerInstance[] */ private function matchProducerInstance(Request $request) { + $matchingInstances = []; foreach ($this->producerInstances as $producerInstance) { $producer = $producerInstance->getProducer(); if (!$producer instanceof HttpRequestProducerInterface) { - // This should never happen but maybe we should add a warning here? + // This should never happen, but maybe we should add a warning here? continue; } if ($request->getUri()->getPath() === $producer->getAttachedRequestUri() && $producer->getAttachedRequestMethod() === $request->getMethod()) { - return $producerInstance; + $matchingInstances[] = $producerInstance; } } - return false; + return $matchingInstances; } } diff --git a/tests/Integration/HttpRequestProducerAndWorkerTest.php b/tests/Integration/HttpRequestProducerAndWorkerTest.php index 1122051..2543632 100644 --- a/tests/Integration/HttpRequestProducerAndWorkerTest.php +++ b/tests/Integration/HttpRequestProducerAndWorkerTest.php @@ -22,20 +22,16 @@ class HttpRequestProducerAndWorkerTest extends KernelTestCase { use TestUtils; - private $workerFile; - private $httpPort; - private const FLOW_CODE = 'http_producer_flow'; - protected function setUp(): void + public function testHttpRequestProducerAndWorker() { - parent::setUp(); - $this->workerFile = vfsStream::url('root/worker.data'); + $workerFile = vfsStream::url('root/worker.data'); self::createKernel( [ 'services' => [ DummyHttpRequestProducer::class => ['arguments' => []], - DummyFilesystemWorker::class => ['arguments' => [$this->workerFile]], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], ], 'flows' => [ self::FLOW_CODE => [ @@ -46,16 +42,13 @@ protected function setUp(): void ] ] ); - $this->httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); - } + $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); - public function testHttpRequestProducerAndWorker() - { - Loop::delay(100, function () { - yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$this->httpPort}"); + Loop::delay(100, function () use ($httpPort){ + yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); $payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]); $client = HttpClientBuilder::buildDefault(); - $request = new Request("http://127.0.0.1:{$this->httpPort}/dummy", 'POST'); + $request = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); $request->setBody($payload); $response = yield $client->request($request); $this->assertStringContainsString( @@ -63,13 +56,13 @@ public function testHttpRequestProducerAndWorker() yield $response->getBody()->read() ); }); - $this->stopWhen(function () { - return (yield exists($this->workerFile)) && count($this->getFileLines($this->workerFile)) === 3; + $this->stopWhen(function () use ($workerFile) { + return (yield exists($workerFile)) && count($this->getFileLines($workerFile)) === 3; }); self::$kernel->boot(); - $workerFileLines = $this->getFileLines($this->workerFile); + $workerFileLines = $this->getFileLines($workerFile); $this->assertCount(3, $workerFileLines); $this->assertStringContainsString('job1', $workerFileLines[0]); $this->assertStringContainsString('job2', $workerFileLines[1]); @@ -89,13 +82,101 @@ public function testHttpRequestProducerAndWorker() $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); } + public function testMultipleHttpRequestProducersForSameRequest() + { + $firstWorkerFile = vfsStream::url('root/first-worker.data'); + $secondWorkerFile = vfsStream::url('root/second-worker.data'); + + self::createKernel( + [ + 'services' => [ + 'http_request_producer' => [ + 'class' => DummyHttpRequestProducer::class, + 'arguments' => [] + ], + 'first_filesystem_worker' => [ + 'class' => DummyFilesystemWorker::class, + 'arguments' => [$firstWorkerFile] + ], + 'second_filesystem_worker' => [ + 'class' => DummyFilesystemWorker::class, + 'arguments' => [$secondWorkerFile] + ], + ], + 'flows' => [ + 'first_http_producer_flow' => [ + 'description' => 'First Http Request Producer And Worker Test Flow', + 'producer' => ['service' => 'http_request_producer'], + 'worker' => ['service' => 'first_filesystem_worker'], + ], + 'second_http_producer_flow' => [ + 'description' => 'Second Http Request Producer And Worker Test Flow', + 'producer' => ['service' => 'http_request_producer'], + 'worker' => ['service' => 'second_filesystem_worker'], + ] + ] + ] + ); + $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); + + Loop::delay(100, function () use ($httpPort){ + yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); + $payload = json_encode(['jobs' => ['job1']]); + $client = HttpClientBuilder::buildDefault(); + $request = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); + $request->setBody($payload); + $response = yield $client->request($request); + $this->assertStringContainsString( + '"Successfully scheduled 1 job(s) to be queued."', + yield $response->getBody()->read() + ); + }); + $this->stopWhen(function () use ($firstWorkerFile, $secondWorkerFile) { + return + (yield exists($firstWorkerFile)) && + count($this->getFileLines($firstWorkerFile)) === 1 && + (yield exists($secondWorkerFile)) && + count($this->getFileLines($secondWorkerFile)) === 1 + ; + }); + + self::$kernel->boot(); + + $firstWorkerFileLines = $this->getFileLines($firstWorkerFile); + $secondWorkerFileLines = $this->getFileLines($secondWorkerFile); + $this->assertCount(1, $firstWorkerFileLines); + $this->assertCount(1, $secondWorkerFileLines); + $this->assertStringContainsString('job1', $firstWorkerFileLines[0]); + $this->assertStringContainsString('job1', $secondWorkerFileLines[0]); + $this->assertReadyJobsCountInTube(0, 'first_http_producer_flow'); + $this->assertReadyJobsCountInTube(0, 'second_http_producer_flow'); + } + public function testHttpRequestProducerWithWrongUriShouldReturn404() { - Loop::delay(100, function () { - yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$this->httpPort}"); + $workerFile = vfsStream::url('root/worker.data'); + self::createKernel( + [ + 'services' => [ + DummyHttpRequestProducer::class => ['arguments' => []], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], + ], + 'flows' => [ + self::FLOW_CODE => [ + 'description' => 'Http Request Producer And Worker Test Flow', + 'producer' => ['service' => DummyHttpRequestProducer::class], + 'worker' => ['service' => DummyFilesystemWorker::class], + ] + ] + ] + ); + $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); + + Loop::delay(100, function () use ($httpPort) { + yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); $payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]); $client = HttpClientBuilder::buildDefault(); - $request = new Request("http://127.0.0.1:{$this->httpPort}/wrong-uri", 'POST'); + $request = new Request("http://127.0.0.1:{$httpPort}/wrong-uri", 'POST'); $request->setBody($payload); $response = yield $client->request($request); $this->assertEquals(404, $response->getStatus()); @@ -106,7 +187,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() self::$kernel->boot(); - $this->assertFileDoesNotExist($this->workerFile); + $this->assertFileDoesNotExist($workerFile); $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); }