Skip to content

Commit

Permalink
Change the flow of processing upload variant requests
Browse files Browse the repository at this point in the history
  • Loading branch information
loevgaard committed Nov 27, 2024
1 parent 71f939c commit 9441c0f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 21 deletions.
10 changes: 1 addition & 9 deletions src/Message/Command/ProcessUploadProductVariantRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,12 @@ final class ProcessUploadProductVariantRequest implements CommandInterface
*/
public int $uploadProductVariantRequest;

/**
* If the version is set, it will be used to check if the upload product variant request has been updated since it was triggered for processing
*/
public ?int $version = null;

public function __construct(int|UploadProductVariantRequestInterface $uploadProductVariantRequest, int $version = null)
public function __construct(int|UploadProductVariantRequestInterface $uploadProductVariantRequest)
{
if ($uploadProductVariantRequest instanceof UploadProductVariantRequestInterface) {
$version = $uploadProductVariantRequest->getVersion();

$uploadProductVariantRequest = (int) $uploadProductVariantRequest->getId();
}

$this->uploadProductVariantRequest = $uploadProductVariantRequest;
$this->version = $version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Setono\SyliusPeakPlugin\Message\CommandHandler;

use Doctrine\ORM\OptimisticLockException;
use Doctrine\Persistence\ManagerRegistry;
use Setono\Doctrine\ORMTrait;
use Setono\PeakWMS\Client\ClientInterface;
Expand Down Expand Up @@ -43,12 +44,13 @@ public function __invoke(ProcessUploadProductVariantRequest $message): void
throw new UnrecoverableMessageHandlingException(sprintf('Upload product variant request with id %d does not exist', $message->uploadProductVariantRequest));
}

if (null !== $message->version && $uploadProductVariantRequest->getVersion() !== $message->version) {
throw new UnrecoverableMessageHandlingException(sprintf('Upload product variant request with id %d has been updated since it was tried to be processed', $message->uploadProductVariantRequest));
}
$this->uploadProductVariantRequestWorkflow->apply($uploadProductVariantRequest, UploadProductVariantRequestWorkflow::TRANSITION_PROCESS);

if ($uploadProductVariantRequest->getState() !== UploadProductVariantRequestInterface::STATE_PROCESSING) {
throw new UnrecoverableMessageHandlingException(sprintf('Upload product variant request with id %d is not in the processing state', $message->uploadProductVariantRequest));
try {
$manager->flush();
} catch (OptimisticLockException) {
// This means that the upload product variant request has been updated since it was fetched
return;
}

$productVariant = $uploadProductVariantRequest->getProductVariant();
Expand All @@ -70,6 +72,9 @@ public function __invoke(ProcessUploadProductVariantRequest $message): void

$this->uploadProductVariantRequestWorkflow->apply($uploadProductVariantRequest, UploadProductVariantRequestWorkflow::TRANSITION_UPLOAD);
} catch (TooManyRequestsException $e) {
// This will put the message back in the queue to be retried later
$this->uploadProductVariantRequestWorkflow->apply($uploadProductVariantRequest, UploadProductVariantRequestWorkflow::TRANSITION_DISPATCH);

throw new RecoverableMessageHandlingException(
message: sprintf('There were too many requests to Peak WMS API when trying to process upload product variant request with id %d. The message will be retried later.', $message->uploadProductVariantRequest),
previous: $e,
Expand All @@ -90,8 +95,6 @@ public function __invoke(ProcessUploadProductVariantRequest $message): void
if ($manager->isOpen()) {
$manager->flush();
}

$message->version = $uploadProductVariantRequest->getVersion();
}
}
}
2 changes: 2 additions & 0 deletions src/Model/UploadProductVariantRequestInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ interface UploadProductVariantRequestInterface extends ResourceInterface, Versio
{
public const STATE_PENDING = 'pending';

public const STATE_DISPATCHED = 'dispatched';

public const STATE_PROCESSING = 'processing';

public const STATE_UPLOADED = 'uploaded';
Expand Down
15 changes: 13 additions & 2 deletions src/Processor/UploadProductVariantRequestProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Setono\SyliusPeakPlugin\Provider\PreQualifiedUploadProductVariantRequestsProviderInterface;
use Setono\SyliusPeakPlugin\Workflow\UploadProductVariantRequestWorkflow;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Workflow\Exception\LogicException;
use Symfony\Component\Workflow\WorkflowInterface;

Expand All @@ -29,9 +30,11 @@ public function __construct(

public function process(): void
{
$delay = $i = 0;

foreach ($this->preQualifiedUploadProductVariantRequestsProvider->getUploadProductVariantRequests() as $uploadProductVariantRequest) {
try {
$this->uploadProductVariantRequestWorkflow->apply($uploadProductVariantRequest, UploadProductVariantRequestWorkflow::TRANSITION_PROCESS);
$this->uploadProductVariantRequestWorkflow->apply($uploadProductVariantRequest, UploadProductVariantRequestWorkflow::TRANSITION_DISPATCH);
} catch (LogicException) {
continue;
}
Expand All @@ -42,7 +45,15 @@ public function process(): void
continue;
}

$this->commandBus->dispatch(new ProcessUploadProductVariantRequest($uploadProductVariantRequest));
++$i;

// According to https://api.peakwms.com/api/documentation/index.html the rate limit is 240 requests per minute
// So we will increase the delay by 1 second every 4 iterations (240 / 60 = 4)
if ($i % 4 === 0) {
$delay += 1000;
}

$this->commandBus->dispatch(new ProcessUploadProductVariantRequest($uploadProductVariantRequest), [new DelayStamp($delay)]);
}
}
}
14 changes: 11 additions & 3 deletions src/Workflow/UploadProductVariantRequestWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ final class UploadProductVariantRequestWorkflow

final public const NAME = 'setono_sylius_peak__upload_product_variant_request';

final public const TRANSITION_DISPATCH = 'dispatch';

final public const TRANSITION_PROCESS = 'process';

final public const TRANSITION_UPLOAD = 'upload';
Expand All @@ -32,6 +34,7 @@ public static function getStates(): array
{
return [
UploadProductVariantRequestInterface::STATE_PENDING,
UploadProductVariantRequestInterface::STATE_DISPATCHED,
UploadProductVariantRequestInterface::STATE_PROCESSING,
UploadProductVariantRequestInterface::STATE_UPLOADED,
UploadProductVariantRequestInterface::STATE_FAILED,
Expand Down Expand Up @@ -69,19 +72,24 @@ public static function getConfig(): array
public static function getTransitions(): array
{
return [
new Transition(
self::TRANSITION_DISPATCH,
[UploadProductVariantRequestInterface::STATE_PENDING, UploadProductVariantRequestInterface::STATE_PROCESSING],
UploadProductVariantRequestInterface::STATE_DISPATCHED,
),
new Transition(
self::TRANSITION_PROCESS,
[UploadProductVariantRequestInterface::STATE_PENDING, UploadProductVariantRequestInterface::STATE_UPLOADED],
UploadProductVariantRequestInterface::STATE_DISPATCHED,
UploadProductVariantRequestInterface::STATE_PROCESSING,
),
new Transition(
self::TRANSITION_UPLOAD,
[UploadProductVariantRequestInterface::STATE_PROCESSING],
UploadProductVariantRequestInterface::STATE_PROCESSING,
UploadProductVariantRequestInterface::STATE_UPLOADED,
),
new Transition(
self::TRANSITION_FAIL,
[UploadProductVariantRequestInterface::STATE_PENDING, UploadProductVariantRequestInterface::STATE_PROCESSING],
[UploadProductVariantRequestInterface::STATE_PENDING, UploadProductVariantRequestInterface::STATE_DISPATCHED, UploadProductVariantRequestInterface::STATE_PROCESSING],
UploadProductVariantRequestInterface::STATE_FAILED,
),
new Transition(
Expand Down

0 comments on commit 9441c0f

Please sign in to comment.