Skip to content

Commit

Permalink
Merge pull request #10 from Setono/handle-stuck-orders
Browse files Browse the repository at this point in the history
Handle upload order requests that are stuck in 'processing' state
  • Loading branch information
loevgaard authored Nov 18, 2024
2 parents 69e4f4d + b62c265 commit 857ba58
Show file tree
Hide file tree
Showing 18 changed files with 319 additions and 5 deletions.
5 changes: 5 additions & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@
</errorLevel>
</TooManyTemplateParams>
<PluginIssue name="QueryBuilderSetParameter" errorLevel="suppress"/>
<DeprecatedClass>
<errorLevel type="suppress">
<referencedClass name="Sylius\Bundle\AdminBundle\Menu\OrderShowMenuBuilder"/>
</errorLevel>
</DeprecatedClass>
</issueHandlers>
</psalm>
9 changes: 7 additions & 2 deletions src/Command/ProcessUploadOrderRequestsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Setono\SyliusPeakPlugin\Command;

use Setono\SyliusPeakPlugin\Processor\FailedUploadOrderRequestProcessorInterface;
use Setono\SyliusPeakPlugin\Processor\UploadOrderRequestProcessorInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand All @@ -16,13 +17,17 @@
)]
final class ProcessUploadOrderRequestsCommand extends Command
{
public function __construct(private readonly UploadOrderRequestProcessorInterface $uploadOrderRequestProcessor)
{
public function __construct(
private readonly UploadOrderRequestProcessorInterface $uploadOrderRequestProcessor,
private readonly FailedUploadOrderRequestProcessorInterface $failedUploadOrderRequestProcessor,
) {
parent::__construct();
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->failedUploadOrderRequestProcessor->process();

$this->uploadOrderRequestProcessor->process();

return 0;
Expand Down
18 changes: 18 additions & 0 deletions src/Event/FailedUploadOrderRequestsQueryBuilderCreatedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\Event;

use Doctrine\ORM\QueryBuilder;

/**
* This event is fired when the query builder for failed upload order requests has been created.
* Listen to this event if you want to filter on associated orders of the upload order requests.
*/
final class FailedUploadOrderRequestsQueryBuilderCreatedEvent
{
public function __construct(public readonly QueryBuilder $queryBuilder)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\EventSubscriber\Workflow\UploadOrderRequest;

use Setono\SyliusPeakPlugin\Model\UploadOrderRequestInterface;
use Setono\SyliusPeakPlugin\Workflow\UploadOrderRequestWorkflow;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Workflow\Event\CompletedEvent;
use Webmozart\Assert\Assert;

final class IncrementTriesSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents(): array
{
return [sprintf('workflow.%s.completed.%s', UploadOrderRequestWorkflow::NAME, UploadOrderRequestWorkflow::TRANSITION_PROCESS) => 'incrementTries'];
}

public function incrementTries(CompletedEvent $event): void
{
/** @var UploadOrderRequestInterface|object $uploadOrderRequest */
$uploadOrderRequest = $event->getSubject();
Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);

$uploadOrderRequest->incrementTries();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\EventSubscriber\Workflow\UploadOrderRequest;

use Setono\SyliusPeakPlugin\Model\UploadOrderRequestInterface;
use Setono\SyliusPeakPlugin\Workflow\UploadOrderRequestWorkflow;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Workflow\Event\CompletedEvent;
use Webmozart\Assert\Assert;

final class RetrySubscriber implements EventSubscriberInterface
{
public function __construct(private readonly int $maxTries = 5)
{
}

public static function getSubscribedEvents(): array
{
return [sprintf('workflow.%s.completed.%s', UploadOrderRequestWorkflow::NAME, UploadOrderRequestWorkflow::TRANSITION_FAIL) => 'retry'];
}

public function retry(CompletedEvent $event): void
{
/** @var UploadOrderRequestInterface|object $uploadOrderRequest */
$uploadOrderRequest = $event->getSubject();
Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);

if ($uploadOrderRequest->getTries() >= $this->maxTries) {
return;
}

$event->getWorkflow()->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_RESET);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\EventSubscriber\Workflow\UploadOrderRequest;

use Setono\SyliusPeakPlugin\Model\UploadOrderRequestInterface;
use Setono\SyliusPeakPlugin\Workflow\UploadOrderRequestWorkflow;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Workflow\Event\CompletedEvent;
use Webmozart\Assert\Assert;

final class StateUpdatedSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents(): array
{
return [sprintf('workflow.%s.completed', UploadOrderRequestWorkflow::NAME) => 'updateTimestamp'];
}

public function updateTimestamp(CompletedEvent $event): void
{
/** @var UploadOrderRequestInterface|object $uploadOrderRequest */
$uploadOrderRequest = $event->getSubject();
Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);

$uploadOrderRequest->setStateUpdatedAt(new \DateTimeImmutable());
}
}
29 changes: 29 additions & 0 deletions src/Model/UploadOrderRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class UploadOrderRequest implements UploadOrderRequestInterface

protected string $state = self::STATE_PENDING;

protected ?\DateTimeInterface $stateUpdatedAt = null;

protected ?OrderInterface $order = null;

protected ?string $request = null;
Expand All @@ -22,6 +24,8 @@ class UploadOrderRequest implements UploadOrderRequestInterface

protected ?int $peakOrderId = null;

protected int $tries = 0;

public function getId(): ?int
{
return $this->id;
Expand All @@ -47,6 +51,16 @@ public function setState(string $state): void
$this->state = $state;
}

public function getStateUpdatedAt(): ?\DateTimeInterface
{
return $this->stateUpdatedAt;
}

public function setStateUpdatedAt(\DateTimeInterface $stateUpdatedAt): void
{
$this->stateUpdatedAt = $stateUpdatedAt;
}

public function getOrder(): ?OrderInterface
{
return $this->order;
Expand Down Expand Up @@ -96,4 +110,19 @@ public function setPeakOrderId(?int $peakOrderId): void
{
$this->peakOrderId = $peakOrderId;
}

public function getTries(): int
{
return $this->tries;
}

public function setTries(int $tries): void
{
$this->tries = $tries;
}

public function incrementTries(): void
{
++$this->tries;
}
}
10 changes: 10 additions & 0 deletions src/Model/UploadOrderRequestInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public function getState(): string;

public function setState(string $state): void;

public function getStateUpdatedAt(): ?\DateTimeInterface;

public function setStateUpdatedAt(\DateTimeInterface $stateUpdatedAt): void;

public function getOrder(): ?OrderInterface;

public function setOrder(?OrderInterface $order): void;
Expand All @@ -45,4 +49,10 @@ public function setError(?string $error): void;
public function getPeakOrderId(): ?int;

public function setPeakOrderId(?int $peakOrderId): void;

public function getTries(): int;

public function setTries(int $tries): void;

public function incrementTries(): void;
}
43 changes: 43 additions & 0 deletions src/Processor/FailedUploadOrderRequestProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\Processor;

use Doctrine\ORM\OptimisticLockException;
use Doctrine\Persistence\ManagerRegistry;
use Setono\Doctrine\ORMTrait;
use Setono\SyliusPeakPlugin\Provider\FailedUploadOrderRequestsProviderInterface;
use Setono\SyliusPeakPlugin\Workflow\UploadOrderRequestWorkflow;
use Symfony\Component\Workflow\Exception\LogicException;
use Symfony\Component\Workflow\WorkflowInterface;

final class FailedUploadOrderRequestProcessor implements FailedUploadOrderRequestProcessorInterface
{
use ORMTrait;

public function __construct(
private readonly FailedUploadOrderRequestsProviderInterface $failedUploadOrderRequestsProvider,
private readonly WorkflowInterface $uploadOrderRequestWorkflow,
ManagerRegistry $managerRegistry,
) {
$this->managerRegistry = $managerRegistry;
}

public function process(): void
{
foreach ($this->failedUploadOrderRequestsProvider->getUploadOrderRequests() as $uploadOrderRequest) {
try {
$this->uploadOrderRequestWorkflow->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_FAIL);
} catch (LogicException) {
continue;
}

try {
$this->getManager($uploadOrderRequest)->flush();
} catch (OptimisticLockException) {
continue;
}
}
}
}
10 changes: 10 additions & 0 deletions src/Processor/FailedUploadOrderRequestProcessorInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\Processor;

interface FailedUploadOrderRequestProcessorInterface
{
public function process(): void;
}
4 changes: 2 additions & 2 deletions src/Processor/UploadOrderRequestProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class UploadOrderRequestProcessor implements UploadOrderRequestProcessorIn
use ORMTrait;

public function __construct(
private readonly PreQualifiedUploadOrderRequestsProviderInterface $preQualifiedUploadableOrdersProvider,
private readonly PreQualifiedUploadOrderRequestsProviderInterface $preQualifiedUploadOrderRequestsProvider,
private readonly MessageBusInterface $commandBus,
private readonly WorkflowInterface $uploadOrderRequestWorkflow,
ManagerRegistry $managerRegistry,
Expand All @@ -29,7 +29,7 @@ public function __construct(

public function process(): void
{
foreach ($this->preQualifiedUploadableOrdersProvider->getUploadOrderRequests() as $uploadOrderRequest) {
foreach ($this->preQualifiedUploadOrderRequestsProvider->getUploadOrderRequests() as $uploadOrderRequest) {
try {
$this->uploadOrderRequestWorkflow->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_PROCESS);
} catch (LogicException) {
Expand Down
49 changes: 49 additions & 0 deletions src/Provider/FailedUploadOrderRequestsProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\Provider;

use Doctrine\Persistence\ManagerRegistry;
use DoctrineBatchUtils\BatchProcessing\SelectBatchIteratorAggregate;
use Psr\EventDispatcher\EventDispatcherInterface;
use Setono\Doctrine\ORMTrait;
use Setono\SyliusPeakPlugin\Event\FailedUploadOrderRequestsQueryBuilderCreatedEvent;
use Setono\SyliusPeakPlugin\Model\UploadOrderRequestInterface;

final class FailedUploadOrderRequestsProvider implements FailedUploadOrderRequestsProviderInterface
{
use ORMTrait;

/**
* @param class-string<UploadOrderRequestInterface> $uploadOrderRequestClass
*/
public function __construct(
ManagerRegistry $managerRegistry,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly string $uploadOrderRequestClass,
private readonly string $processingTimeout = '1 hour',
) {
$this->managerRegistry = $managerRegistry;
}

/**
* @return \Generator<array-key, UploadOrderRequestInterface>
*/
public function getUploadOrderRequests(): \Generator
{
$qb = $this->getRepository($this->uploadOrderRequestClass)->createQueryBuilder('o')
->andWhere('o.state = :orderState')
->andWhere('o.stateUpdatedAt < :stateUpdatedAt')
->setParameter('state', UploadOrderRequestInterface::STATE_PROCESSING)
->setParameter('stateUpdatedAt', new \DateTimeImmutable('-' . $this->processingTimeout))
;

$this->eventDispatcher->dispatch(new FailedUploadOrderRequestsQueryBuilderCreatedEvent($qb));

/** @var SelectBatchIteratorAggregate<array-key, UploadOrderRequestInterface> $iterator */
$iterator = SelectBatchIteratorAggregate::fromQuery($qb->getQuery(), 50);

yield from $iterator;
}
}
15 changes: 15 additions & 0 deletions src/Provider/FailedUploadOrderRequestsProviderInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Setono\SyliusPeakPlugin\Provider;

use Setono\SyliusPeakPlugin\Model\UploadOrderRequestInterface;

interface FailedUploadOrderRequestsProviderInterface
{
/**
* @return iterable<array-key, UploadOrderRequestInterface>
*/
public function getUploadOrderRequests(): iterable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
</id>

<field name="version" type="integer" version="true"/>
<field name="state" type="string"/>
<field name="state" column="state" type="string"/>
<field name="stateUpdatedAt" column="state_updated_at" type="datetime" nullable="true"/>
<field name="request" type="text" nullable="true"/>
<field name="response" type="text" nullable="true"/>
<field name="error" type="text" nullable="true"/>
<field name="peakOrderId" type="integer" nullable="true"/>
<field name="tries" type="integer"/>

<one-to-one field="order" target-entity="Sylius\Component\Order\Model\OrderInterface" inversed-by="peakUploadOrderRequest">
<join-column name="order_id" referenced-column-name="id" nullable="false" unique="true" on-delete="CASCADE"/>
</one-to-one>

<indexes>
<index columns="state,state_updated_at"/>
</indexes>
</mapped-superclass>
</doctrine-mapping>
1 change: 1 addition & 0 deletions src/Resources/config/services/command.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<service id="Setono\SyliusPeakPlugin\Command\ProcessUploadOrderRequestsCommand">
<argument type="service" id="Setono\SyliusPeakPlugin\Processor\UploadOrderRequestProcessorInterface"/>
<argument type="service" id="Setono\SyliusPeakPlugin\Processor\FailedUploadOrderRequestProcessorInterface"/>

<tag name="console.command"/>
</service>
Expand Down
Loading

0 comments on commit 857ba58

Please sign in to comment.