Skip to content

Commit

Permalink
WIP - Multiple matches for http producer
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianaromagnoli committed Jul 10, 2024
1 parent bb1aed0 commit 5b9b7d5
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 39 deletions.
4 changes: 1 addition & 3 deletions src/ProducerInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Webgriffe\Esb;

use Amp\Beanstalk\BeanstalkClient;
use function Amp\call;
use Amp\Loop;
use Amp\Promise;
use Psr\Log\LoggerInterface;
Expand All @@ -17,6 +16,7 @@
use Webgriffe\Esb\Service\HttpProducersServer;
use Webgriffe\Esb\Service\ProducerQueueManagerInterface;
use Webgriffe\Esb\Service\QueueManager;
use function Amp\call;

final class ProducerInstance implements ProducerInstanceInterface
{
Expand Down Expand Up @@ -166,7 +166,6 @@ public function produceAndQueueJobs($data = null): Promise
return call(function () use ($data) {
$jobsCount = 0;
$job = null;
$test = false;
try {
$jobs = $this->producer->produce($data);
while (yield $jobs->advance()) {
Expand All @@ -192,7 +191,6 @@ public function produceAndQueueJobs($data = null): Promise
'producer' => \get_class($this->producer),
'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null,
'error' => $error->getMessage(),
'test' => $test
]
);
}
Expand Down
36 changes: 21 additions & 15 deletions src/Service/HttpProducersServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,46 @@ public function addProducerInstance(ProducerInstance $producerInstance): void
*/
private function requestHandler(Request $request)
{
$producerInstance = $this->matchProducerInstance($request);
if (!$producerInstance) {
return new Response(Status::NOT_FOUND, [], 'Producer Not Found');
$producerInstances = $this->matchProducerInstance($request);

if (count($producerInstances) === 0) {
return new Response(Status::NOT_FOUND, [], 'No Producers Found');
}

$jobsCount = 0;
foreach ($producerInstances as $producerInstance) {
$this->logger->info(
'Matched an HTTP Producer for an incoming HTTP request.',
[
'producer' => \get_class($producerInstance->getProducer()),
'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri())
]
);
$jobsCount += yield $producerInstance->produceAndQueueJobs($request);
}

$this->logger->info(
'Matched an HTTP Producer for an incoming HTTP request.',
[
'producer' => \get_class($producerInstance->getProducer()),
'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri())
]
);
$jobsCount = yield $producerInstance->produceAndQueueJobs($request);
$responseMessage = sprintf('Successfully scheduled %s job(s) to be queued.', $jobsCount);
return new Response(Status::OK, [], sprintf('"%s"', $responseMessage));
}

/**
* @param Request $request
* @return false|ProducerInstance
* @return ProducerInstance[]
*/
private function matchProducerInstance(Request $request)
{
$matchingInstances = [];
foreach ($this->producerInstances as $producerInstance) {
$producer = $producerInstance->getProducer();
if (!$producer instanceof HttpRequestProducerInterface) {
// This should never happen but maybe we should add a warning here?
// This should never happen, but maybe we should add a warning here?
continue;
}
if ($request->getUri()->getPath() === $producer->getAttachedRequestUri() &&
$producer->getAttachedRequestMethod() === $request->getMethod()) {
return $producerInstance;
$matchingInstances[] = $producerInstance;
}
}
return false;
return $matchingInstances;
}
}
123 changes: 102 additions & 21 deletions tests/Integration/HttpRequestProducerAndWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@ class HttpRequestProducerAndWorkerTest extends KernelTestCase
{
use TestUtils;

private $workerFile;
private $httpPort;

private const FLOW_CODE = 'http_producer_flow';

protected function setUp(): void
public function testHttpRequestProducerAndWorker()
{
parent::setUp();
$this->workerFile = vfsStream::url('root/worker.data');
$workerFile = vfsStream::url('root/worker.data');
self::createKernel(
[
'services' => [
DummyHttpRequestProducer::class => ['arguments' => []],
DummyFilesystemWorker::class => ['arguments' => [$this->workerFile]],
DummyFilesystemWorker::class => ['arguments' => [$workerFile]],
],
'flows' => [
self::FLOW_CODE => [
Expand All @@ -46,30 +42,27 @@ protected function setUp(): void
]
]
);
$this->httpPort = self::$kernel->getContainer()->getParameter('http_server_port');
}
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

public function testHttpRequestProducerAndWorker()
{
Loop::delay(100, function () {
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$this->httpPort}");
Loop::delay(100, function () use ($httpPort){
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}");
$payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]);
$client = HttpClientBuilder::buildDefault();
$request = new Request("http://127.0.0.1:{$this->httpPort}/dummy", 'POST');
$request = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST');
$request->setBody($payload);
$response = yield $client->request($request);
$this->assertStringContainsString(
'"Successfully scheduled 3 job(s) to be queued."',
yield $response->getBody()->read()
);
});
$this->stopWhen(function () {
return (yield exists($this->workerFile)) && count($this->getFileLines($this->workerFile)) === 3;
$this->stopWhen(function () use ($workerFile) {
return (yield exists($workerFile)) && count($this->getFileLines($workerFile)) === 3;
});

self::$kernel->boot();

$workerFileLines = $this->getFileLines($this->workerFile);
$workerFileLines = $this->getFileLines($workerFile);
$this->assertCount(3, $workerFileLines);
$this->assertStringContainsString('job1', $workerFileLines[0]);
$this->assertStringContainsString('job2', $workerFileLines[1]);
Expand All @@ -89,13 +82,101 @@ public function testHttpRequestProducerAndWorker()
$this->assertReadyJobsCountInTube(0, self::FLOW_CODE);
}

public function testMultipleHttpRequestProducersForSameRequest()
{
$firstWorkerFile = vfsStream::url('root/first-worker.data');
$secondWorkerFile = vfsStream::url('root/second-worker.data');

self::createKernel(
[
'services' => [
'http_request_producer' => [
'class' => DummyHttpRequestProducer::class,
'arguments' => []
],
'first_filesystem_worker' => [
'class' => DummyFilesystemWorker::class,
'arguments' => [$firstWorkerFile]
],
'second_filesystem_worker' => [
'class' => DummyFilesystemWorker::class,
'arguments' => [$secondWorkerFile]
],
],
'flows' => [
'first_http_producer_flow' => [
'description' => 'First Http Request Producer And Worker Test Flow',
'producer' => ['service' => 'http_request_producer'],
'worker' => ['service' => 'first_filesystem_worker'],
],
'second_http_producer_flow' => [
'description' => 'Second Http Request Producer And Worker Test Flow',
'producer' => ['service' => 'http_request_producer'],
'worker' => ['service' => 'second_filesystem_worker'],
]
]
]
);
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

Loop::delay(100, function () use ($httpPort){
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}");
$payload = json_encode(['jobs' => ['job1']]);
$client = HttpClientBuilder::buildDefault();
$request = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST');
$request->setBody($payload);
$response = yield $client->request($request);
$this->assertStringContainsString(
'"Successfully scheduled 1 job(s) to be queued."',
yield $response->getBody()->read()
);
});
$this->stopWhen(function () use ($firstWorkerFile, $secondWorkerFile) {
return
(yield exists($firstWorkerFile)) &&
count($this->getFileLines($firstWorkerFile)) === 1 &&
(yield exists($secondWorkerFile)) &&
count($this->getFileLines($secondWorkerFile)) === 1
;
});

self::$kernel->boot();

$firstWorkerFileLines = $this->getFileLines($firstWorkerFile);
$secondWorkerFileLines = $this->getFileLines($secondWorkerFile);
$this->assertCount(1, $firstWorkerFileLines);
$this->assertCount(1, $secondWorkerFileLines);
$this->assertStringContainsString('job1', $firstWorkerFileLines[0]);
$this->assertStringContainsString('job1', $secondWorkerFileLines[0]);
$this->assertReadyJobsCountInTube(0, 'first_http_producer_flow');
$this->assertReadyJobsCountInTube(0, 'second_http_producer_flow');
}

public function testHttpRequestProducerWithWrongUriShouldReturn404()
{
Loop::delay(100, function () {
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$this->httpPort}");
$workerFile = vfsStream::url('root/worker.data');
self::createKernel(
[
'services' => [
DummyHttpRequestProducer::class => ['arguments' => []],
DummyFilesystemWorker::class => ['arguments' => [$workerFile]],
],
'flows' => [
self::FLOW_CODE => [
'description' => 'Http Request Producer And Worker Test Flow',
'producer' => ['service' => DummyHttpRequestProducer::class],
'worker' => ['service' => DummyFilesystemWorker::class],
]
]
]
);
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

Loop::delay(100, function () use ($httpPort) {
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}");
$payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]);
$client = HttpClientBuilder::buildDefault();
$request = new Request("http://127.0.0.1:{$this->httpPort}/wrong-uri", 'POST');
$request = new Request("http://127.0.0.1:{$httpPort}/wrong-uri", 'POST');
$request->setBody($payload);
$response = yield $client->request($request);
$this->assertEquals(404, $response->getStatus());
Expand All @@ -106,7 +187,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404()

self::$kernel->boot();

$this->assertFileDoesNotExist($this->workerFile);
$this->assertFileDoesNotExist($workerFile);
$this->assertReadyJobsCountInTube(0, self::FLOW_CODE);
}

Expand Down

0 comments on commit 5b9b7d5

Please sign in to comment.