From 90f2b8f649fc37cf6169dde45d8ea0ab9c10eb52 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:11:11 +0100 Subject: [PATCH 01/17] BUGFIX: Ensure all events are published BEFORE catchup Otherwise, due to failures in projection or catchup-hooks the process would be immediately interrupted leaving a broken state. For example a faulty redirect handler hook - that just listens to live events - would be called during publishing. That means the remaining part to publish is already commited and we know we still have work to do to fork the new user content stream and apply the remaining. But the catchup hook would interrupt immediately when the events were catchup'd live. We would be left with a CLOSED user content stream that contains the "same" events that went live during the rebase. Reopening would not help at that point. This is why we must ensure that all events are published BEFORE we do the first catchup. Further implications: - running catchup only once should be more performant - we cannot refetch the current content stream version for what where previously "subcommans" (`forkContentStream`) but we must pass $expectedVersions around from the outside - we should not run constraint checks after the first `yield` as that would still operate on the old state. Thus all checks are combined above --- .../Classes/CommandHandler/CommandBus.php | 6 +- .../CommandHandlerInterface.php | 4 +- .../Classes/ContentRepository.php | 40 +++- .../Classes/EventStore/EventPersister.php | 14 +- .../Classes/Feature/ContentStreamHandling.php | 113 ++------- .../Feature/WorkspaceCommandHandler.php | 223 ++++++++++-------- .../Classes/Projection/CatchUp.php | 2 +- 7 files changed, 183 insertions(+), 219 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php index 92673a00c82..1a69d3d017c 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php @@ -11,6 +11,7 @@ * Implementation Detail of {@see ContentRepository::handle}, which does the command dispatching to the different * {@see CommandHandlerInterface} implementation. * + * @phpstan-import-type YieldedEventsToPublish from CommandHandlerInterface * @internal */ final readonly class CommandBus @@ -29,7 +30,10 @@ public function __construct( } /** - * @return EventsToPublish|\Generator + * The handler only calculate which events they want to have published, + * but do not do the publishing themselves + * + * @return EventsToPublish|YieldedEventsToPublish */ public function handle(CommandInterface $command): EventsToPublish|\Generator { diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php index b36d5d3ab75..1ff7cee24eb 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php @@ -5,12 +5,14 @@ namespace Neos\ContentRepository\Core\CommandHandler; use Neos\ContentRepository\Core\EventStore\EventsToPublish; +use Neos\EventStore\Model\EventStore\CommitResult; /** * Common interface for all Content Repository command handlers * * The {@see CommandHandlingDependencies} are available during handling to do soft-constraint checks * + * @phpstan-type YieldedEventsToPublish \Generator * @internal no public API, because commands are no extension points of the CR */ interface CommandHandlerInterface @@ -23,7 +25,7 @@ public function canHandle(CommandInterface $command): bool; * For the case of the workspace command handler that need to publish to many streams and "close" the content-stream directly, * it's allowed to yield the events to interact with the control flow of event publishing. * - * @return EventsToPublish|\Generator + * @return EventsToPublish|YieldedEventsToPublish */ public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator; } diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f474dda8191..592870b3412 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -94,19 +94,35 @@ public function __construct( */ public function handle(CommandInterface $command): void { - // the commands only calculate which events they want to have published, but do not do the - // publishing themselves - $eventsToPublishOrGenerator = $this->commandBus->handle($command); - - if ($eventsToPublishOrGenerator instanceof EventsToPublish) { - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublishOrGenerator); - $this->eventPersister->publishEvents($this, $eventsToPublish); - } else { - foreach ($eventsToPublishOrGenerator as $eventsToPublish) { - assert($eventsToPublish instanceof EventsToPublish); // just for the ide - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublish); - $this->eventPersister->publishEvents($this, $eventsToPublish); + $toPublish = $this->commandBus->handle($command); + + if ($toPublish instanceof EventsToPublish) { + // simple case + $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); + if ($eventsToPublish->events->isEmpty()) { + return; + } + $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $this->catchupProjections(); + return; + } + + // control-flow aware command handling via generator + try { + $yieldedEventsToPublish = $toPublish->current(); + while ($yieldedEventsToPublish !== null) { + if ($yieldedEventsToPublish->events->isEmpty()) { + $yieldedEventsToPublish = $toPublish->send(null); + continue; + } + $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); + $commitResult = $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $yieldedEventsToPublish = $toPublish->send($commitResult); } + } finally { + // We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled. + // Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted. + $this->catchupProjections(); } } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 4909d50e661..59b102e03bd 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -8,6 +8,7 @@ use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Events; +use Neos\EventStore\Model\EventStore\CommitResult; /** * Internal service to persist {@see EventInterface} with the proper normalization, and triggering the @@ -31,15 +32,22 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl if ($eventsToPublish->events->isEmpty()) { return; } + $this->publishWithoutCatchup($eventsToPublish); + $contentRepository->catchUpProjections(); + } + + /** + * @throws ConcurrencyException in case the expectedVersion does not match + */ + public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult + { $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) ); - $this->eventStore->commit( + return $this->eventStore->commit( $eventsToPublish->streamName, $normalizedEvents, $eventsToPublish->expectedVersion ); - - $contentRepository->catchUpProjections(); } } diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index c3027d71147..d71c28f4097 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -9,55 +9,25 @@ use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened; -use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked; use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamAlreadyExists; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed; -use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsNotClosed; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\EventStream\ExpectedVersion; trait ContentStreamHandling { - /** - * @param ContentStreamId $contentStreamId The id of the content stream to create - * @throws ContentStreamAlreadyExists - * @phpstan-pure this method is pure, to persist the events they must be handled outside - */ - private function createContentStream( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, - ): EventsToPublish { - $this->requireContentStreamToNotExistYet($contentStreamId, $commandHandlingDependencies); - $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId) - ->getEventStreamName(); - - return new EventsToPublish( - $streamName, - Events::with( - new ContentStreamWasCreated( - $contentStreamId, - ) - ), - ExpectedVersion::NO_STREAM() - ); - } - /** * @param ContentStreamId $contentStreamId The id of the content stream to close - * @param CommandHandlingDependencies $commandHandlingDependencies - * @return EventsToPublish * @phpstan-pure this method is pure, to persist the events they must be handled outside */ private function closeContentStream( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, + Version $contentStreamVersion, ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotBeClosed($contentStreamId, $commandHandlingDependencies); $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); return new EventsToPublish( @@ -67,7 +37,7 @@ private function closeContentStream( $contentStreamId, ), ), - $expectedVersion + ExpectedVersion::fromVersion($contentStreamVersion) ); } @@ -75,21 +45,18 @@ private function closeContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to reopen * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function reopenContentStream( + private function reopenContentStreamWithoutConstraints( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToBeClosed($contentStreamId, $commandHandlingDependencies); - $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasReopened( $contentStreamId ), ), + // We operate here without constraints on purpose to ensure this can be commited. + //Constraints have been checked beforehand and its expected that the content stream is closed. ExpectedVersion::ANY() ); } @@ -104,19 +71,10 @@ private function reopenContentStream( private function forkContentStream( ContentStreamId $newContentStreamId, ContentStreamId $sourceContentStreamId, - CommandHandlingDependencies $commandHandlingDependencies + Version $sourceContentStreamVersion ): EventsToPublish { - $this->requireContentStreamToExist($sourceContentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotBeClosed($sourceContentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotExistYet($newContentStreamId, $commandHandlingDependencies); - - $sourceContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($sourceContentStreamId); - - $streamName = ContentStreamEventStreamName::fromContentStreamId($newContentStreamId) - ->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasForked( $newContentStreamId, @@ -133,25 +91,19 @@ private function forkContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to remove * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function removeContentStream( + private function removeContentStreamWithoutConstraints( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies); - - $streamName = ContentStreamEventStreamName::fromContentStreamId( - $contentStreamId - )->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasRemoved( $contentStreamId, ), ), - $expectedVersion + // We operate here without constraints on purpose to ensure this can be commited. + // Constraints have been checked beforehand and its expected that the content stream is closed. + ExpectedVersion::ANY() ); } @@ -172,23 +124,6 @@ private function requireContentStreamToNotExistYet( } } - /** - * @param ContentStreamId $contentStreamId - * @param CommandHandlingDependencies $commandHandlingDependencies - * @throws ContentStreamDoesNotExistYet - */ - private function requireContentStreamToExist( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): void { - if (!$commandHandlingDependencies->contentStreamExists($contentStreamId)) { - throw new ContentStreamDoesNotExistYet( - 'Content stream "' . $contentStreamId->value . '" does not exist yet.', - 1521386692 - ); - } - } - private function requireContentStreamToNotBeClosed( ContentStreamId $contentStreamId, CommandHandlingDependencies $commandHandlingDependencies @@ -200,24 +135,4 @@ private function requireContentStreamToNotBeClosed( ); } } - - private function requireContentStreamToBeClosed( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): void { - if (!$commandHandlingDependencies->isContentStreamClosed($contentStreamId)) { - throw new ContentStreamIsNotClosed( - 'Content stream "' . $contentStreamId->value . '" is not closed.', - 1710405911 - ); - } - } - - private function getExpectedVersionOfContentStream( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): ExpectedVersion { - $version = $commandHandlingDependencies->getContentStreamVersion($contentStreamId); - return ExpectedVersion::fromVersion($version); - } } diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 20143536272..0be16cd86cb 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -20,14 +20,14 @@ use Neos\ContentRepository\Core\CommandHandler\CommandSimulatorFactory; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\DecoratedEvent; -use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\EventStore\Events; use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Feature\Common\PublishableToWorkspaceInterface; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened; -use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked; +use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; +use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateRootWorkspace; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateWorkspace; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; @@ -55,6 +55,7 @@ use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Exception\WorkspaceRebaseFailed; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamAlreadyExists; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; +use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceHasNoBaseWorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; @@ -62,13 +63,13 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceStatus; use Neos\EventStore\EventStoreInterface; -use Neos\EventStore\Model\Event\EventType; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\EventStream\EventStreamInterface; use Neos\EventStore\Model\EventStream\ExpectedVersion; /** + * @phpstan-import-type YieldedEventsToPublish from CommandHandlerInterface * @internal from userland, you'll use ContentRepository::handle to dispatch commands */ final readonly class WorkspaceCommandHandler implements CommandHandlerInterface @@ -87,6 +88,9 @@ public function canHandle(CommandInterface $command): bool return method_exists($this, 'handle' . (new \ReflectionClass($command))->getShortName()); } + /** + * @return YieldedEventsToPublish + */ public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): \Generator { /** @phpstan-ignore-next-line */ @@ -115,7 +119,6 @@ private function handleCreateWorkspace( ): \Generator { $this->requireWorkspaceToNotExist($command->workspaceName, $commandHandlingDependencies); $baseWorkspace = $commandHandlingDependencies->findWorkspaceByName($command->baseWorkspaceName); - if ($baseWorkspace === null) { throw new BaseWorkspaceDoesNotExist(sprintf( 'The workspace %s (base workspace of %s) does not exist', @@ -123,12 +126,15 @@ private function handleCreateWorkspace( $command->workspaceName->value ), 1513890708); } + $sourceContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); + $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); + $this->requireContentStreamToNotExistYet($command->newContentStreamId, $commandHandlingDependencies); // When the workspace is created, we first have to fork the content stream yield $this->forkContentStream( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $sourceContentStreamVersion ); yield new EventsToPublish( @@ -154,11 +160,16 @@ private function handleCreateRootWorkspace( CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { $this->requireWorkspaceToNotExist($command->workspaceName, $commandHandlingDependencies); + $this->requireContentStreamToNotExistYet($command->newContentStreamId, $commandHandlingDependencies); - $newContentStreamId = $command->newContentStreamId; - yield $this->createContentStream( - $newContentStreamId, - $commandHandlingDependencies + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($command->newContentStreamId)->getEventStreamName(), + Events::with( + new ContentStreamWasCreated( + $command->newContentStreamId, + ) + ), + ExpectedVersion::NO_STREAM() ); yield new EventsToPublish( @@ -166,7 +177,7 @@ private function handleCreateRootWorkspace( Events::with( new RootWorkspaceWasCreated( $command->workspaceName, - $newContentStreamId + $command->newContentStreamId ) ), ExpectedVersion::ANY() @@ -183,16 +194,12 @@ private function handlePublishWorkspace( // no-op return; } - - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1729711258); - } - $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); - $baseContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); + $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, - $commandHandlingDependencies + $workspaceContentStreamVersion ); $rebaseableCommands = RebaseableCommands::extractFromEventStream( @@ -206,15 +213,14 @@ private function handlePublishWorkspace( yield from $this->publishWorkspace( $workspace, $baseWorkspace, + $baseWorkspaceContentStreamVersion, $command->newContentStreamId, - $baseContentStreamVersion, - $rebaseableCommands, - $commandHandlingDependencies + $rebaseableCommands ); - } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { // and rethrow in yield + // todo catch all + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); throw $workspaceRebaseFailed; } @@ -223,10 +229,9 @@ private function handlePublishWorkspace( private function publishWorkspace( Workspace $workspace, Workspace $baseWorkspace, + Version $baseWorkspaceContentStreamVersion, ContentStreamId $newContentStreamId, - Version $baseContentStreamVersion, - RebaseableCommands $rebaseableCommands, - CommandHandlingDependencies $commandHandlingDependencies, + RebaseableCommands $rebaseableCommands ): \Generator { $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -242,7 +247,7 @@ static function ($handle) use ($rebaseableCommands): void { throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } - yield new EventsToPublish( + $commitResult = yield new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) ->getEventStreamName(), $this->getCopiedEventsOfEventStream( @@ -250,13 +255,14 @@ static function ($handle) use ($rebaseableCommands): void { $baseWorkspace->currentContentStreamId, $commandSimulator->eventStream(), ), - ExpectedVersion::fromVersion($baseContentStreamVersion) + // todo can fail; must reopen!!!!! + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $commitResult->highestCommittedVersion ); yield new EventsToPublish( @@ -272,19 +278,19 @@ static function ($handle) use ($rebaseableCommands): void { ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } private function rebaseWorkspaceWithoutChanges( Workspace $workspace, Workspace $baseWorkspace, - ContentStreamId $newContentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, + Version $baseWorkspaceContentStreamVersion, + ContentStreamId $newContentStreamId ): \Generator { yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -299,11 +305,11 @@ private function rebaseWorkspaceWithoutChanges( ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } /** - * Copy all events from the passed event stream which implement the {@see PublishableToOtherContentStreamsInterface} + * Copy all events from the passed event stream which implement the {@see PublishableToWorkspaceInterface} */ private function getCopiedEventsOfEventStream( WorkspaceName $targetWorkspaceName, @@ -339,6 +345,8 @@ private function handleRebaseWorkspace( if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { throw new \RuntimeException('Cannot rebase a workspace with a stateless content stream', 1711718314); } + $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); if ( $workspace->status === WorkspaceStatus::UP_TO_DATE @@ -350,7 +358,7 @@ private function handleRebaseWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $commandHandlingDependencies + $workspaceContentStreamVersion ); if (!$workspace->hasPublishableChanges()) { @@ -358,8 +366,8 @@ private function handleRebaseWorkspace( yield from $this->rebaseWorkspaceWithoutChanges( $workspace, $baseWorkspace, - $command->rebasedContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->rebasedContentStreamId ); return; } @@ -385,9 +393,8 @@ static function ($handle) use ($rebaseableCommands): void { $command->rebaseErrorHandlingStrategy === RebaseErrorHandlingStrategy::STRATEGY_FAIL && $commandSimulator->hasCommandsThatFailed() ) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); // throw an exception that contains all the information about what exactly failed @@ -398,6 +405,7 @@ static function ($handle) use ($rebaseableCommands): void { yield from $this->forkNewContentStreamAndApplyEvents( $command->rebasedContentStreamId, $baseWorkspace->currentContentStreamId, + $baseWorkspaceContentStreamVersion, new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::with( @@ -413,21 +421,16 @@ static function ($handle) use ($rebaseableCommands): void { $command->workspaceName, $command->rebasedContentStreamId, $commandSimulator->eventStream(), - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } /** * This method is like a combined Rebase and Publish! * - * @throws BaseWorkspaceDoesNotExist - * @throws ContentStreamAlreadyExists - * @throws ContentStreamDoesNotExistYet - * @throws WorkspaceDoesNotExist - * @throws \Exception + * @return YieldedEventsToPublish */ private function handlePublishIndividualNodesFromWorkspace( PublishIndividualNodesFromWorkspace $command, @@ -444,12 +447,12 @@ private function handlePublishIndividualNodesFromWorkspace( if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1710410114); } - $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); - $baseContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); + $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, - $commandHandlingDependencies + $workspaceContentStreamVersion ); $rebaseableCommands = RebaseableCommands::extractFromEventStream( @@ -463,9 +466,8 @@ private function handlePublishIndividualNodesFromWorkspace( if ($matchingCommands->isEmpty()) { // almost a noop (e.g. random node ids were specified) ;) - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); return; } @@ -476,16 +478,14 @@ private function handlePublishIndividualNodesFromWorkspace( yield from $this->publishWorkspace( $workspace, $baseWorkspace, + $baseWorkspaceContentStreamVersion, $command->contentStreamIdForRemainingPart, - $baseContentStreamVersion, - $matchingCommands, - $commandHandlingDependencies + $matchingCommands ); return; } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); throw $workspaceRebaseFailed; } @@ -507,16 +507,15 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ); if ($commandSimulator->hasCommandsThatFailed()) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } // this could be a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag, meaning we actually just rebase - yield new EventsToPublish( + $commitResult = yield new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) ->getEventStreamName(), $this->getCopiedEventsOfEventStream( @@ -524,12 +523,14 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC $baseWorkspace->currentContentStreamId, $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), ), - ExpectedVersion::fromVersion($baseContentStreamVersion) + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); yield from $this->forkNewContentStreamAndApplyEvents( $command->contentStreamIdForRemainingPart, $baseWorkspace->currentContentStreamId, + // todo otherwise Features/W8-IndividualNodePublication/03-MoreBasicFeatures.feature:185 fails, see comment about emptiness above ... or should we manually count? + $commitResult?->highestCommittedVersion ?: $baseWorkspaceContentStreamVersion, new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::fromArray([ @@ -547,11 +548,10 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC $command->workspaceName, $command->contentStreamIdForRemainingPart, $commandSimulator->eventStream()->withMinimumSequenceNumber($highestSequenceNumberForMatching->next()) - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } /** @@ -576,13 +576,12 @@ private function handleDiscardIndividualNodesFromWorkspace( return; } - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot discard nodes on a workspace with a stateless content stream', 1710408112); - } + $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, - $commandHandlingDependencies + $workspaceContentStreamVersion ); // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) @@ -596,9 +595,8 @@ private function handleDiscardIndividualNodesFromWorkspace( if ($commandsToDiscard->isEmpty()) { // if we have nothing to discard, we can just keep all. (e.g. random node ids were specified) It's almost a noop ;) - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); return; } @@ -607,9 +605,10 @@ private function handleDiscardIndividualNodesFromWorkspace( // quick path everything was discarded yield from $this->discardWorkspace( $workspace, + $workspaceContentStreamVersion, $baseWorkspace, - $command->newContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId ); return; } @@ -625,9 +624,8 @@ static function ($handle) use ($commandsToKeep): void { ); if ($commandSimulator->hasCommandsThatFailed()) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringDiscard($commandSimulator->getCommandsThatFailed()); } @@ -635,6 +633,7 @@ static function ($handle) use ($commandsToKeep): void { yield from $this->forkNewContentStreamAndApplyEvents( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, + $baseWorkspaceContentStreamVersion, new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::with( @@ -651,11 +650,10 @@ static function ($handle) use ($commandsToKeep): void { $command->workspaceName, $command->newContentStreamId, $commandSimulator->eventStream(), - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } /** @@ -674,31 +672,32 @@ private function handleDiscardWorkspace( return; } + $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + yield from $this->discardWorkspace( $workspace, + $workspaceContentStreamVersion, $baseWorkspace, - $command->newContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId ); } /** - * @param Workspace $workspace - * @param Workspace $baseWorkspace - * @param ContentStreamId $newContentStream - * @param CommandHandlingDependencies $commandHandlingDependencies * @phpstan-pure this method is pure, to persist the events they must be handled outside */ private function discardWorkspace( Workspace $workspace, + Version $workspaceContentStreamVersion, Workspace $baseWorkspace, - ContentStreamId $newContentStream, - CommandHandlingDependencies $commandHandlingDependencies + Version $baseWorkspaceContentStreamVersion, + ContentStreamId $newContentStream ): \Generator { yield $this->forkContentStream( $newContentStream, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -713,7 +712,7 @@ private function discardWorkspace( ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); } /** @@ -731,6 +730,8 @@ private function handleChangeBaseWorkspace( $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); $currentBaseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies); + $this->requireContentStreamToNotBeClosed($workspace->currentContentStreamId, $commandHandlingDependencies); + if ($currentBaseWorkspace->workspaceName->equals($command->baseWorkspaceName)) { // no-op return; @@ -738,13 +739,14 @@ private function handleChangeBaseWorkspace( $this->requireEmptyWorkspace($workspace); $newBaseWorkspace = $this->requireWorkspace($command->baseWorkspaceName, $commandHandlingDependencies); - $this->requireNonCircularRelationBetweenWorkspaces($workspace, $newBaseWorkspace, $commandHandlingDependencies); + $newBaseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($newBaseWorkspace, $commandHandlingDependencies); + yield $this->forkContentStream( $command->newContentStreamId, $newBaseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $newBaseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -768,10 +770,16 @@ private function handleDeleteWorkspace( CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); + $contentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($workspace->currentContentStreamId); - yield $this->removeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)->getEventStreamName(), + Events::with( + new ContentStreamWasRemoved( + $workspace->currentContentStreamId, + ), + ), + ExpectedVersion::fromVersion($contentStreamVersion) ); yield new EventsToPublish( @@ -788,14 +796,14 @@ private function handleDeleteWorkspace( private function forkNewContentStreamAndApplyEvents( ContentStreamId $newContentStreamId, ContentStreamId $sourceContentStreamId, + Version $sourceContentStreamVersion, EventsToPublish $pointWorkspaceToNewContentStream, Events $eventsToApplyOnNewContentStream, - CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { yield $this->forkContentStream( $newContentStreamId, $sourceContentStreamId, - $commandHandlingDependencies + $sourceContentStreamVersion )->withAppendedEvents(Events::with( new ContentStreamWasClosed( $newContentStreamId @@ -830,6 +838,17 @@ private function requireWorkspaceToNotExist(WorkspaceName $workspaceName, Comman ), 1715341085); } + private function requireOpenContentStreamVersion(Workspace $workspace, CommandHandlingDependencies $commandHandlingDependencies): Version + { + if ($commandHandlingDependencies->isContentStreamClosed($workspace->currentContentStreamId)) { + throw new ContentStreamIsClosed( + 'Content stream "' . $workspace->currentContentStreamId . '" is closed.', + 1730730516 + ); + } + return $commandHandlingDependencies->getContentStreamVersion($workspace->currentContentStreamId); + } + /** * @throws WorkspaceDoesNotExist */ diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php index b997978bfe7..35cd26467a9 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php @@ -107,7 +107,7 @@ public function run(EventStreamInterface $eventStream): SequenceNumber try { ($this->eventHandler)($eventEnvelope); } catch (\Exception $e) { - throw new \RuntimeException(sprintf('Exception while catching up to sequence number %d', $eventEnvelope->sequenceNumber->value), 1710707311, $e); + throw new \RuntimeException(sprintf('Exception while catching up to sequence number %d: %s', $eventEnvelope->sequenceNumber->value, $e->getMessage()), 1710707311, $e); } $iteration++; if ($this->batchSize === 1 || $iteration % $this->batchSize === 0) { From 4352b2e02f0d77d6896554f924c4b8cb5fb3cd3e Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 11:46:40 +0100 Subject: [PATCH 02/17] BUGFIX: Fix reopen content stream if base workspace was written to during publication ... and a ConcurrencyException is thrown Introduces a `WorkspacePublicationDuringWritingTest` parallel test (with own cr) to assert that behaviour. --- .../Configuration/Settings.yaml | 18 +- .../Parallel/AbstractParallelTestCase.php | 11 +- .../WorkspacePublicationDuringWritingTest.php | 249 ++++++++++++++++++ ...p => WorkspaceWritingDuringRebaseTest.php} | 10 +- .../Classes/ContentRepository.php | 11 +- .../Feature/WorkspaceCommandHandler.php | 38 +-- Neos.Neos/Classes/Domain/Model/UserId.php | 2 +- composer.json | 2 +- 8 files changed, 313 insertions(+), 28 deletions(-) create mode 100644 Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php rename Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/{WorkspaceWritingDuringRebase.php => WorkspaceWritingDuringRebaseTest.php} (97%) diff --git a/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml b/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml index 212fb010b63..dd92883ec16 100644 --- a/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml +++ b/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml @@ -20,7 +20,7 @@ Neos: ContentRepositoryRegistry: contentRepositories: - test_parallel: + test_parallel_a: eventStore: factoryObjectName: Neos\ContentRepositoryRegistry\Factory\EventStore\DoctrineEventStoreFactory nodeTypeManager: @@ -35,6 +35,22 @@ Neos: contentGraphProjection: factoryObjectName: Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjectionFactory + test_parallel_b: + eventStore: + factoryObjectName: Neos\ContentRepositoryRegistry\Factory\EventStore\DoctrineEventStoreFactory + nodeTypeManager: + factoryObjectName: Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinPyStringNodeBasedNodeTypeManagerFactory + contentDimensionSource: + factoryObjectName: Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinTableNodeBasedContentDimensionSourceFactory + userIdProvider: + factoryObjectName: Neos\ContentRepositoryRegistry\Factory\UserIdProvider\StaticUserIdProviderFactory + clock: + factoryObjectName: Neos\ContentRepositoryRegistry\Factory\Clock\SystemClockFactory + propertyConverters: {} + contentGraphProjection: + factoryObjectName: Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjectionFactory + + Flow: object: includeClasses: diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php index 67afbdc91ab..ef97e1bea36 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php @@ -52,14 +52,15 @@ final protected function awaitFile(string $filename): void } } - final protected function awaitSharedLock($resource, int $maximumCycles = 2000): void + final protected function awaitFileRemoval(string $filename): void { $waiting = 0; - while (!flock($resource, LOCK_SH)) { - usleep(10000); + while (!is_file($filename)) { + usleep(1000); $waiting++; - if ($waiting > $maximumCycles) { - throw new \Exception('timeout while waiting on shared lock'); + clearstatcache(true, $filename); + if ($waiting > 60000) { + throw new \Exception('timeout while waiting on file ' . $filename); } } } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php new file mode 100644 index 00000000000..8de4ffbf10b --- /dev/null +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php @@ -0,0 +1,249 @@ +log('------ process started ------'); + // todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346 + GherkinTableNodeBasedContentDimensionSourceFactory::$contentDimensionsToUse = new class implements ContentDimensionSourceInterface + { + public function getDimension(ContentDimensionId $dimensionId): ?ContentDimension + { + return null; + } + public function getContentDimensionsOrderedByPriority(): array + { + return []; + } + }; + // todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346 + GherkinPyStringNodeBasedNodeTypeManagerFactory::$nodeTypesToUse = new NodeTypeManager( + fn (): array => [ + 'Neos.ContentRepository:Root' => [], + 'Neos.ContentRepository.Testing:Document' => [ + 'properties' => [ + 'title' => [ + 'type' => 'string' + ] + ] + ] + ] + ); + + $setupLockResource = fopen(self::SETUP_LOCK_PATH, 'w+'); + + $exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB); + if ($exclusiveNonBlockingLockResult === false) { + $this->log('waiting for setup'); + if (!flock($setupLockResource, LOCK_SH)) { + throw new \RuntimeException('failed to acquire blocking shared lock'); + } + $this->contentRepository = $this->contentRepositoryRegistry + ->get(ContentRepositoryId::fromString('test_parallel_a')); + $this->log('wait for setup finished'); + return; + } + + $this->log('setup started'); + $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel_a')); + + $origin = OriginDimensionSpacePoint::createWithoutDimensions(); + $contentRepository->handle(CreateRootWorkspace::create( + WorkspaceName::forLive(), + ContentStreamId::fromString('live-cs-id') + )); + $contentRepository->handle(CreateRootNodeAggregateWithNode::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('lady-eleonode-rootford'), + NodeTypeName::fromString(NodeTypeName::ROOT_NODE_TYPE_NAME) + )); + $contentRepository->handle(CreateNodeAggregateWithNode::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('nody-mc-nodeface'), + NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'), + $origin, + NodeAggregateId::fromString('lady-eleonode-rootford'), + initialPropertyValues: PropertyValuesToWrite::fromArray([ + 'title' => 'title-original' + ]) + )); + $contentRepository->handle(CreateWorkspace::create( + WorkspaceName::fromString('user-test'), + WorkspaceName::forLive(), + ContentStreamId::fromString('user-cs-id') + )); + for ($i = 0; $i <= 5000; $i++) { + $contentRepository->handle(CreateNodeAggregateWithNode::create( + WorkspaceName::fromString('user-test'), + NodeAggregateId::fromString('nody-mc-nodeface-' . $i), + NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'), + $origin, + NodeAggregateId::fromString('lady-eleonode-rootford'), + initialPropertyValues: PropertyValuesToWrite::fromArray([ + 'title' => 'title' + ]) + )); + } + $this->contentRepository = $contentRepository; + + if (!flock($setupLockResource, LOCK_UN)) { + throw new \RuntimeException('failed to release setup lock'); + } + + $this->log('setup finished'); + } + + /** + * @test + * @group parallel + */ + public function whileANodesArWrittenOnLive(): void + { + $this->log('writing started'); + + touch(self::WRITING_IS_RUNNING_FLAG_PATH); + + try { + for ($i = 0; $i <= 50; $i++) { + $this->contentRepository->handle( + SetNodeProperties::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('nody-mc-nodeface'), + OriginDimensionSpacePoint::createWithoutDimensions(), + PropertyValuesToWrite::fromArray([ + 'title' => 'changed-title-' . $i + ]) + ) + ); + } + } finally { + unlink(self::WRITING_IS_RUNNING_FLAG_PATH); + } + + $this->log('writing finished'); + Assert::assertTrue(true, 'No exception was thrown ;)'); + } + + /** + * @test + * @group parallel + */ + public function thenConcurrentPublishLeadsToException(): void + { + if (!is_file(self::WRITING_IS_RUNNING_FLAG_PATH)) { + $this->log('waiting to publish'); + + $this->awaitFile(self::WRITING_IS_RUNNING_FLAG_PATH); + // If write is the process that does the (slowish) setup, and then waits for the rebase to start, + // We give the CR some time to close the content stream + // TODO find another way than to randomly wait!!! + // The problem is, if we dont sleep it happens often that the modification works only then the rebase is startet _really_ + // Doing the modification several times in hope that the second one fails will likely just stop the rebase thread as it cannot close + usleep(10000); + } + + $this->log('publish started'); + + $actualException = null; + try { + $this->contentRepository->handle(PublishWorkspace::create( + WorkspaceName::fromString('user-test') + )); + } catch (\Exception $thrownException) { + $actualException = $thrownException; + } + + $this->log('publish finished'); + + if ($actualException === null) { + Assert::fail(sprintf('No exception was thrown')); + } + + if ($actualException instanceof \RuntimeException && $actualException->getCode() === 1652279016) { + // todo can be removed soon + $this->log(sprintf('got expected RuntimeException exception: %s', $actualException->getMessage())); + } elseif ($actualException instanceof ConcurrencyException) { + $this->log(sprintf('got expected ConcurrencyException exception: %s', $actualException->getMessage())); + } else { + Assert::assertInstanceOf(ConcurrencyException::class, $actualException); + } + + $this->awaitFileRemoval(self::WRITING_IS_RUNNING_FLAG_PATH); + + // just to make sure were up-to-date now! + $this->contentRepository->catchupProjections(); + + // writing to user works!!! + try { + $this->contentRepository->handle( + SetNodeProperties::create( + WorkspaceName::fromString('user-test'), + NodeAggregateId::fromString('nody-mc-nodeface'), + OriginDimensionSpacePoint::createWithoutDimensions(), + PropertyValuesToWrite::fromArray([ + 'title' => 'written-after-failed-publish' + ]) + ) + ); + } catch (ContentStreamIsClosed $exception) { + Assert::fail(sprintf('Workspace that failed to be publish cannot be written: %s', $exception->getMessage())); + } + + $node = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test')) + ->getSubgraph(DimensionSpacePoint::createWithoutDimensions(), VisibilityConstraints::withoutRestrictions()) + ->findNodeById(NodeAggregateId::fromString('nody-mc-nodeface')); + + Assert::assertSame('written-after-failed-publish', $node?->getProperty('title')); + } +} diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php similarity index 97% rename from Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php rename to Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php index dedd5d918da..ef66c7ceccd 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php @@ -43,7 +43,7 @@ use Neos\Flow\ObjectManagement\ObjectManagerInterface; use PHPUnit\Framework\Assert; -class WorkspaceWritingDuringRebase extends AbstractParallelTestCase +class WorkspaceWritingDuringRebaseTest extends AbstractParallelTestCase { private const SETUP_LOCK_PATH = __DIR__ . '/setup-lock'; @@ -88,15 +88,17 @@ public function getContentDimensionsOrderedByPriority(): array $exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB); if ($exclusiveNonBlockingLockResult === false) { $this->log('waiting for setup'); - $this->awaitSharedLock($setupLockResource); + if (!flock($setupLockResource, LOCK_SH)) { + throw new \RuntimeException('failed to acquire blocking shared lock'); + } $this->contentRepository = $this->contentRepositoryRegistry - ->get(ContentRepositoryId::fromString('test_parallel')); + ->get(ContentRepositoryId::fromString('test_parallel_b')); $this->log('wait for setup finished'); return; } $this->log('setup started'); - $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel')); + $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel_b')); $origin = OriginDimensionSpacePoint::createWithoutDimensions(); $contentRepository->handle(CreateRootWorkspace::create( diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 592870b3412..c3277a1d4f3 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -46,6 +46,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\EventEnvelope; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Psr\Clock\ClockInterface; @@ -116,7 +117,15 @@ public function handle(CommandInterface $command): void continue; } $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); - $commitResult = $this->eventPersister->publishWithoutCatchup($eventsToPublish); + try { + $commitResult = $this->eventPersister->publishWithoutCatchup($eventsToPublish); + } catch (ConcurrencyException $concurrencyException) { + $yieldedErrorStrategy = $toPublish->throw($concurrencyException); + if ($yieldedErrorStrategy instanceof EventsToPublish) { + $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); + } + throw $concurrencyException; + } $yieldedEventsToPublish = $toPublish->send($commitResult); } } finally { diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 0be16cd86cb..5ceb1c02076 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -63,6 +63,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceStatus; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\EventStream\EventStreamInterface; @@ -217,12 +218,12 @@ private function handlePublishWorkspace( $command->newContentStreamId, $rebaseableCommands ); - } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { // and rethrow in yield - // todo catch all + } catch (WorkspaceRebaseFailed|ConcurrencyException $publishFailed) { + // todo catch all? Dont catch ANY ConcurrencyException because say if forking failed we dont need to reopen? yield $this->reopenContentStreamWithoutConstraints( $workspace->currentContentStreamId ); - throw $workspaceRebaseFailed; + throw $publishFailed; } } @@ -247,6 +248,7 @@ static function ($handle) use ($rebaseableCommands): void { throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } + // todo throw base workspace was modified in the meantime to distinguish exception above to reopen? $commitResult = yield new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) ->getEventStreamName(), @@ -255,7 +257,6 @@ static function ($handle) use ($rebaseableCommands): void { $baseWorkspace->currentContentStreamId, $commandSimulator->eventStream(), ), - // todo can fail; must reopen!!!!! ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); @@ -514,17 +515,24 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } - // this could be a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag, meaning we actually just rebase - $commitResult = yield new EventsToPublish( - ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) - ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), - ), - ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) - ); + try { + // this could be a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag, meaning we actually just rebase + $commitResult = yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) + ->getEventStreamName(), + $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), + ), + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) + ); + } catch (ConcurrencyException $concurrencyException) { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + throw $concurrencyException; + } yield from $this->forkNewContentStreamAndApplyEvents( $command->contentStreamIdForRemainingPart, diff --git a/Neos.Neos/Classes/Domain/Model/UserId.php b/Neos.Neos/Classes/Domain/Model/UserId.php index 2011ebb9a56..cf73375ddee 100644 --- a/Neos.Neos/Classes/Domain/Model/UserId.php +++ b/Neos.Neos/Classes/Domain/Model/UserId.php @@ -15,7 +15,7 @@ public function __construct( public string $value ) { if (!preg_match('/^([a-z0-9\-]{1,40})$/', $value)) { - throw new \InvalidArgumentException(sprintf('Invalid user id "%s" (a user id must only contain lowercase characters, numbers and the "-" sign).', 1718293224)); + throw new \InvalidArgumentException(sprintf('Invalid user id "%s" (a user id must only contain lowercase characters, numbers and the "-" sign).', $this->value), 1718293224); } } diff --git a/composer.json b/composer.json index 9a568f75d0b..acaedea5359 100644 --- a/composer.json +++ b/composer.json @@ -109,7 +109,7 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], "test:parallel": [ - "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --group parallel --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php" + "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Parallel" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ From 18072f695823924fae05c2455a8b356fab8a96a2 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:10:54 +0100 Subject: [PATCH 03/17] TASK: Fix parallel tests by ensuring only one is run at time That allows us to use the same content repository. Previously a super slow paratest would lead that another testcase will already be started and its setup then run twice at the end. https://github.com/paratestphp/paratest/discussions/905 --- .../Configuration/Settings.yaml | 18 +----------------- .../WorkspacePublicationDuringWritingTest.php | 4 ++-- .../WorkspaceWritingDuringRebaseTest.php | 4 ++-- composer.json | 11 ++++++++++- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml b/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml index dd92883ec16..212fb010b63 100644 --- a/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml +++ b/Neos.ContentRepository.BehavioralTests/Configuration/Settings.yaml @@ -20,7 +20,7 @@ Neos: ContentRepositoryRegistry: contentRepositories: - test_parallel_a: + test_parallel: eventStore: factoryObjectName: Neos\ContentRepositoryRegistry\Factory\EventStore\DoctrineEventStoreFactory nodeTypeManager: @@ -35,22 +35,6 @@ Neos: contentGraphProjection: factoryObjectName: Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjectionFactory - test_parallel_b: - eventStore: - factoryObjectName: Neos\ContentRepositoryRegistry\Factory\EventStore\DoctrineEventStoreFactory - nodeTypeManager: - factoryObjectName: Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinPyStringNodeBasedNodeTypeManagerFactory - contentDimensionSource: - factoryObjectName: Neos\ContentRepository\BehavioralTests\TestSuite\Behavior\GherkinTableNodeBasedContentDimensionSourceFactory - userIdProvider: - factoryObjectName: Neos\ContentRepositoryRegistry\Factory\UserIdProvider\StaticUserIdProviderFactory - clock: - factoryObjectName: Neos\ContentRepositoryRegistry\Factory\Clock\SystemClockFactory - propertyConverters: {} - contentGraphProjection: - factoryObjectName: Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjectionFactory - - Flow: object: includeClasses: diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php index 8de4ffbf10b..e6bfcdf7f46 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php @@ -90,13 +90,13 @@ public function getContentDimensionsOrderedByPriority(): array throw new \RuntimeException('failed to acquire blocking shared lock'); } $this->contentRepository = $this->contentRepositoryRegistry - ->get(ContentRepositoryId::fromString('test_parallel_a')); + ->get(ContentRepositoryId::fromString('test_parallel')); $this->log('wait for setup finished'); return; } $this->log('setup started'); - $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel_a')); + $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel')); $origin = OriginDimensionSpacePoint::createWithoutDimensions(); $contentRepository->handle(CreateRootWorkspace::create( diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php index ef66c7ceccd..b1022e2b6e6 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php @@ -92,13 +92,13 @@ public function getContentDimensionsOrderedByPriority(): array throw new \RuntimeException('failed to acquire blocking shared lock'); } $this->contentRepository = $this->contentRepositoryRegistry - ->get(ContentRepositoryId::fromString('test_parallel_b')); + ->get(ContentRepositoryId::fromString('test_parallel')); $this->log('wait for setup finished'); return; } $this->log('setup started'); - $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel_b')); + $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel')); $origin = OriginDimensionSpacePoint::createWithoutDimensions(); $contentRepository->handle(CreateRootWorkspace::create( diff --git a/composer.json b/composer.json index acaedea5359..9e2e8bca36a 100644 --- a/composer.json +++ b/composer.json @@ -109,7 +109,7 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], "test:parallel": [ - "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Parallel" + "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do ../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml $f; done" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ @@ -296,5 +296,14 @@ "phpunit/phpunit": "^9.0", "neos/behat": "*", "league/flysystem-memory": "^3" + }, + + "config": { + "_comment": "We need to insert a vendor dir (even though composer install MUST NOT be run here) but so autoloading works for composer scripts", + "vendor-dir": "../Libraries", + "allow-plugins": { + "neos/composer-plugin": false, + "cweagans/composer-patches": false + } } } From 9a6127b6f553ca4cf32414155daf81a8f426720b Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:30:27 +0100 Subject: [PATCH 04/17] TASK: Simplify code and remove reopen cs logic into `publishWorkspace` --- .../WorkspacePublicationDuringWritingTest.php | 21 +++++ .../Feature/WorkspaceCommandHandler.php | 82 +++++++++---------- composer.json | 3 +- 3 files changed, 63 insertions(+), 43 deletions(-) diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php index e6bfcdf7f46..c664404cee6 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php @@ -195,6 +195,27 @@ public function thenConcurrentPublishLeadsToException(): void $this->log('publish started'); + + /* + // NOTE, can also be tested with PartialPublish, or PartialPublish leading to a full publish, but this test only allows one at time :) + + $nodesForAFullPublish = 5000; + $nodesForAPartialPublish = $nodesForAFullPublish - 1; + + $nodeIdToPublish = []; + for ($i = 0; $i <= $nodesForAPartialPublish; $i++) { + $nodeIdToPublish[] = new NodeIdToPublishOrDiscard( + NodeAggregateId::fromString('nody-mc-nodeface-' . $i), // see nodes created above + DimensionSpacePoint::createWithoutDimensions() + ); + } + + $this->contentRepository->handle(PublishIndividualNodesFromWorkspace::create( + WorkspaceName::fromString('user-test'), + NodeIdsToPublishOrDiscard::create(...$nodeIdToPublish) + )); + */ + $actualException = null; try { $this->contentRepository->handle(PublishWorkspace::create( diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 5ceb1c02076..aafa7ec0741 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -210,23 +210,19 @@ private function handlePublishWorkspace( ) ); - try { - yield from $this->publishWorkspace( - $workspace, - $baseWorkspace, - $baseWorkspaceContentStreamVersion, - $command->newContentStreamId, - $rebaseableCommands - ); - } catch (WorkspaceRebaseFailed|ConcurrencyException $publishFailed) { - // todo catch all? Dont catch ANY ConcurrencyException because say if forking failed we dont need to reopen? - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - throw $publishFailed; - } + yield from $this->publishWorkspace( + $workspace, + $baseWorkspace, + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId, + $rebaseableCommands + ); } + /** + * Note that the workspaces content stream must be closed beforehand. + * It will be reopened here in case of error. + */ private function publishWorkspace( Workspace $workspace, Workspace $baseWorkspace, @@ -245,20 +241,29 @@ static function ($handle) use ($rebaseableCommands): void { ); if ($commandSimulator->hasCommandsThatFailed()) { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } - // todo throw base workspace was modified in the meantime to distinguish exception above to reopen? - $commitResult = yield new EventsToPublish( - ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) - ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream(), - ), - ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) - ); + try { + $commitResult = yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) + ->getEventStreamName(), + $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream(), + ), + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) + ); + } catch (ConcurrencyException $concurrencyException) { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + throw $concurrencyException; + } yield $this->forkContentStream( $newContentStreamId, @@ -474,22 +479,15 @@ private function handlePublishIndividualNodesFromWorkspace( } if ($remainingCommands->isEmpty()) { - try { - // do a full publish, this is simpler for the projections to handle - yield from $this->publishWorkspace( - $workspace, - $baseWorkspace, - $baseWorkspaceContentStreamVersion, - $command->contentStreamIdForRemainingPart, - $matchingCommands - ); - return; - } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - throw $workspaceRebaseFailed; - } + // do a full publish, this is simpler for the projections to handle + yield from $this->publishWorkspace( + $workspace, + $baseWorkspace, + $baseWorkspaceContentStreamVersion, + $command->contentStreamIdForRemainingPart, + $matchingCommands + ); + return; } $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); diff --git a/composer.json b/composer.json index 9e2e8bca36a..8dddf5aeab1 100644 --- a/composer.json +++ b/composer.json @@ -108,8 +108,9 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepository.Core/Tests/Unit", "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], + "test:paratest-cli": "../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml", "test:parallel": [ - "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do ../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml $f; done" + "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do composer test:paratest-cli $f; done" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ From cb34618114c9963350b85fa736c90a0b946cecad Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:36:05 +0100 Subject: [PATCH 05/17] TASK: Only fetch content stream once for constraint checks --- .../CommandHandlingDependencies.php | 9 +++++- .../Feature/Common/ConstraintChecks.php | 9 ++---- .../Feature/WorkspaceCommandHandler.php | 32 ++++++++----------- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php index 9e726cacea1..629f5c01c1e 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php @@ -16,6 +16,7 @@ use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface; +use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace; @@ -48,11 +49,17 @@ public function contentStreamExists(ContentStreamId $contentStreamId): bool return $this->contentGraphReadModel->findContentStreamById($contentStreamId) !== null; } + /** + * @throws ContentStreamDoesNotExistYet if there is no matching content stream + */ public function isContentStreamClosed(ContentStreamId $contentStreamId): bool { $contentStream = $this->contentGraphReadModel->findContentStreamById($contentStreamId); if ($contentStream === null) { - throw new \InvalidArgumentException(sprintf('Failed to find content stream with id "%s"', $contentStreamId->value), 1729863973); + throw new ContentStreamDoesNotExistYet( + 'Content stream "' . $contentStreamId->value . '" does not exist.', + 1521386692 + ); } return $contentStream->isClosed; } diff --git a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php index 2be9688f065..5364907d978 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php +++ b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php @@ -81,14 +81,9 @@ protected function requireContentStream( CommandHandlingDependencies $commandHandlingDependencies ): ContentStreamId { $contentStreamId = $commandHandlingDependencies->getContentGraph($workspaceName)->getContentStreamId(); - if (!$commandHandlingDependencies->contentStreamExists($contentStreamId)) { - throw new ContentStreamDoesNotExistYet( - 'Content stream for "' . $workspaceName->value . '" does not exist yet.', - 1521386692 - ); - } + $closedState = $commandHandlingDependencies->isContentStreamClosed($contentStreamId); - if ($commandHandlingDependencies->isContentStreamClosed($contentStreamId)) { + if ($closedState) { throw new ContentStreamIsClosed( 'Content stream "' . $contentStreamId->value . '" is closed.', 1710260081 diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index aafa7ec0741..d1675235f85 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -195,8 +195,8 @@ private function handlePublishWorkspace( // no-op return; } - $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); - $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, @@ -348,11 +348,9 @@ private function handleRebaseWorkspace( ): \Generator { $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); $baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies); - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot rebase a workspace with a stateless content stream', 1711718314); - } - $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); - $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); if ( $workspace->status === WorkspaceStatus::UP_TO_DATE @@ -449,12 +447,8 @@ private function handlePublishIndividualNodesFromWorkspace( return; } - // todo check that fetching workspace throws if there is no content stream id for it - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1710410114); - } - $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); - $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, @@ -582,8 +576,8 @@ private function handleDiscardIndividualNodesFromWorkspace( return; } - $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); - $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); yield $this->closeContentStream( $workspace->currentContentStreamId, @@ -678,8 +672,8 @@ private function handleDiscardWorkspace( return; } - $workspaceContentStreamVersion = $this->requireOpenContentStreamVersion($workspace, $commandHandlingDependencies); - $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($baseWorkspace, $commandHandlingDependencies); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); yield from $this->discardWorkspace( $workspace, @@ -747,7 +741,7 @@ private function handleChangeBaseWorkspace( $newBaseWorkspace = $this->requireWorkspace($command->baseWorkspaceName, $commandHandlingDependencies); $this->requireNonCircularRelationBetweenWorkspaces($workspace, $newBaseWorkspace, $commandHandlingDependencies); - $newBaseWorkspaceContentStreamVersion = $this->requireOpenContentStreamVersion($newBaseWorkspace, $commandHandlingDependencies); + $newBaseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($newBaseWorkspace, $commandHandlingDependencies); yield $this->forkContentStream( $command->newContentStreamId, @@ -844,7 +838,7 @@ private function requireWorkspaceToNotExist(WorkspaceName $workspaceName, Comman ), 1715341085); } - private function requireOpenContentStreamVersion(Workspace $workspace, CommandHandlingDependencies $commandHandlingDependencies): Version + private function requireOpenContentStreamAndVersion(Workspace $workspace, CommandHandlingDependencies $commandHandlingDependencies): Version { if ($commandHandlingDependencies->isContentStreamClosed($workspace->currentContentStreamId)) { throw new ContentStreamIsClosed( From 0273e325ec1ca408234c6aa919d55d66fc6f000d Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:36:47 +0100 Subject: [PATCH 06/17] TASK: Adjust .composer json --- .composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.composer.json b/.composer.json index 1bf4b28f90c..e7f37b99667 100644 --- a/.composer.json +++ b/.composer.json @@ -25,8 +25,9 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepository.Core/Tests/Unit", "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], + "test:paratest-cli": "../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml", "test:parallel": [ - "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --group parallel --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Functional/Feature/WorkspacePublication/WorkspaceWritingDuringPublication.php" + "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do composer test:paratest-cli $f; done" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ From 77778f9e2cc6c2aa0ee3560a491089465bf33f04 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:56:53 +0100 Subject: [PATCH 07/17] TASK: Do not send `$commitResult` to generator but calculate expected version instead Also readd lost documentation and simplifies the `handle` The ->throw logic was initially introduced via https://github.com/neos/neos-development-collection/pull/5315 but then removed again as we thought it was no longer needed. --- .../CommandHandlerInterface.php | 3 +- .../Classes/ContentRepository.php | 15 +++++--- .../Feature/WorkspaceCommandHandler.php | 35 ++++++++++--------- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php index 1ff7cee24eb..6e0436be8fe 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php @@ -5,14 +5,13 @@ namespace Neos\ContentRepository\Core\CommandHandler; use Neos\ContentRepository\Core\EventStore\EventsToPublish; -use Neos\EventStore\Model\EventStore\CommitResult; /** * Common interface for all Content Repository command handlers * * The {@see CommandHandlingDependencies} are available during handling to do soft-constraint checks * - * @phpstan-type YieldedEventsToPublish \Generator + * @phpstan-type YieldedEventsToPublish \Generator * @internal no public API, because commands are no extension points of the CR */ interface CommandHandlerInterface diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index c3277a1d4f3..45b23d1da88 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -110,23 +110,28 @@ public function handle(CommandInterface $command): void // control-flow aware command handling via generator try { - $yieldedEventsToPublish = $toPublish->current(); - while ($yieldedEventsToPublish !== null) { + foreach ($toPublish as $yieldedEventsToPublish) { if ($yieldedEventsToPublish->events->isEmpty()) { - $yieldedEventsToPublish = $toPublish->send(null); continue; } $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); try { - $commitResult = $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); } catch (ConcurrencyException $concurrencyException) { + // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: + // + // try { + // yield EventsToPublish(...); + // } catch (ConcurrencyException $e) { + // yield $this->reopenContentStream(); + // throw $e; + // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); } throw $concurrencyException; } - $yieldedEventsToPublish = $toPublish->send($commitResult); } } finally { // We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled. diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index d1675235f85..2e9d3031ea1 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -247,15 +247,17 @@ static function ($handle) use ($rebaseableCommands): void { throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } + $eventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream(), + ); + try { - $commitResult = yield new EventsToPublish( + yield new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream(), - ), + $eventsOfWorkspaceToPublish, ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); } catch (ConcurrencyException $concurrencyException) { @@ -268,7 +270,7 @@ static function ($handle) use ($rebaseableCommands): void { yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $commitResult->highestCommittedVersion + Version::fromInteger($baseWorkspaceContentStreamVersion->value + $eventsOfWorkspaceToPublish->count()) ); yield new EventsToPublish( @@ -507,16 +509,18 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); } + // this could empty and a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag + $selectedEventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), + ); + try { - // this could be a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag, meaning we actually just rebase - $commitResult = yield new EventsToPublish( + yield new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), - ), + $selectedEventsOfWorkspaceToPublish, ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); } catch (ConcurrencyException $concurrencyException) { @@ -529,8 +533,7 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC yield from $this->forkNewContentStreamAndApplyEvents( $command->contentStreamIdForRemainingPart, $baseWorkspace->currentContentStreamId, - // todo otherwise Features/W8-IndividualNodePublication/03-MoreBasicFeatures.feature:185 fails, see comment about emptiness above ... or should we manually count? - $commitResult?->highestCommittedVersion ?: $baseWorkspaceContentStreamVersion, + Version::fromInteger($baseWorkspaceContentStreamVersion->value + $selectedEventsOfWorkspaceToPublish->count()), new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::fromArray([ From 7b922bff95cd28c63cc6c9accc5e0beecc5ef387 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 20:59:13 +0100 Subject: [PATCH 08/17] TASK: Inline now simplified `YieldedEventsToPublish` virtual type again --- .../Classes/CommandHandler/CommandBus.php | 3 +-- .../Classes/CommandHandler/CommandHandlerInterface.php | 3 +-- .../Classes/Feature/WorkspaceCommandHandler.php | 5 ++--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php index 1a69d3d017c..4cdc38d35af 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php @@ -11,7 +11,6 @@ * Implementation Detail of {@see ContentRepository::handle}, which does the command dispatching to the different * {@see CommandHandlerInterface} implementation. * - * @phpstan-import-type YieldedEventsToPublish from CommandHandlerInterface * @internal */ final readonly class CommandBus @@ -33,7 +32,7 @@ public function __construct( * The handler only calculate which events they want to have published, * but do not do the publishing themselves * - * @return EventsToPublish|YieldedEventsToPublish + * @return EventsToPublish|\Generator */ public function handle(CommandInterface $command): EventsToPublish|\Generator { diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php index 6e0436be8fe..b36d5d3ab75 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlerInterface.php @@ -11,7 +11,6 @@ * * The {@see CommandHandlingDependencies} are available during handling to do soft-constraint checks * - * @phpstan-type YieldedEventsToPublish \Generator * @internal no public API, because commands are no extension points of the CR */ interface CommandHandlerInterface @@ -24,7 +23,7 @@ public function canHandle(CommandInterface $command): bool; * For the case of the workspace command handler that need to publish to many streams and "close" the content-stream directly, * it's allowed to yield the events to interact with the control flow of event publishing. * - * @return EventsToPublish|YieldedEventsToPublish + * @return EventsToPublish|\Generator */ public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator; } diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 2e9d3031ea1..69f64493cc7 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -70,7 +70,6 @@ use Neos\EventStore\Model\EventStream\ExpectedVersion; /** - * @phpstan-import-type YieldedEventsToPublish from CommandHandlerInterface * @internal from userland, you'll use ContentRepository::handle to dispatch commands */ final readonly class WorkspaceCommandHandler implements CommandHandlerInterface @@ -90,7 +89,7 @@ public function canHandle(CommandInterface $command): bool } /** - * @return YieldedEventsToPublish + * @return \Generator */ public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): \Generator { @@ -436,7 +435,7 @@ static function ($handle) use ($rebaseableCommands): void { /** * This method is like a combined Rebase and Publish! * - * @return YieldedEventsToPublish + * @return \Generator */ private function handlePublishIndividualNodesFromWorkspace( PublishIndividualNodesFromWorkspace $command, From d27f83f024c703c4ef3b55f315fea73fb78b5dd9 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:06:18 +0100 Subject: [PATCH 09/17] TASK: Wrap rebaseable command extraction into `finally` block to ensure content stream is never left closed During the beta phase it can happen that user forget to apply a migration to migrate the stored commands in the even metadata, upon publish this would close the content stream and fail directly afterward. Applying the migration then would not be enough as the content stream is a closed state and has to be repaired manually. Event thought this is not super likely, its not unlikely as well and the case during publication were we rely on things that might not be that way. As an alternative we could discuss doing the closing after acquiring the rebaseable commands. --- .../Feature/WorkspaceCommandHandler.php | 77 ++++++++++++------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 69f64493cc7..e1e0ec0979b 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -202,12 +202,18 @@ private function handlePublishWorkspace( $workspaceContentStreamVersion ); - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); + try { + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + } finally { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + } yield from $this->publishWorkspace( $workspace, @@ -377,12 +383,18 @@ private function handleRebaseWorkspace( return; } - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); + try { + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + } finally { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + } $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -456,12 +468,18 @@ private function handlePublishIndividualNodesFromWorkspace( $workspaceContentStreamVersion ); - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); + try { + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + } finally { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + } [$matchingCommands, $remainingCommands] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToPublish); @@ -586,14 +604,21 @@ private function handleDiscardIndividualNodesFromWorkspace( $workspaceContentStreamVersion ); - // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); - [$commandsToDiscard, $commandsToKeep] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToDiscard); + try { + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + + // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) + [$commandsToDiscard, $commandsToKeep] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToDiscard); + } finally { + yield $this->reopenContentStreamWithoutConstraints( + $workspace->currentContentStreamId + ); + } if ($commandsToDiscard->isEmpty()) { // if we have nothing to discard, we can just keep all. (e.g. random node ids were specified) It's almost a noop ;) From de7895e6af96631f4008af6ac3d806c71e73808b Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:18:13 +0100 Subject: [PATCH 10/17] TASK: Close content stream a bit later instead of having to reopen in many edge cases Alternative fix for d27f83f024c703c4ef3b55f315fea73fb78b5dd9 Previously an error in `extractFromEventStream` because the payload was not correct and yet has to be migrated would lead to a closed content stream which is of course persisted even after fixing the events via migration. This is still save to do, as the `closeContentStream` will commit the close on the FIRSTly fetched expected version. Same guarantees, different error behaviour in rare cases. --- .../Feature/WorkspaceCommandHandler.php | 115 +++++++----------- 1 file changed, 45 insertions(+), 70 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index e1e0ec0979b..a2205df717b 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -197,24 +197,18 @@ private function handlePublishWorkspace( $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + yield $this->closeContentStream( $workspace->currentContentStreamId, $workspaceContentStreamVersion ); - try { - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); - } finally { - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - } - yield from $this->publishWorkspace( $workspace, $baseWorkspace, @@ -367,13 +361,13 @@ private function handleRebaseWorkspace( return; } - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $workspaceContentStreamVersion - ); - if (!$workspace->hasPublishableChanges()) { // if we have no changes in the workspace we can fork from the base directly + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + yield from $this->rebaseWorkspaceWithoutChanges( $workspace, $baseWorkspace, @@ -383,18 +377,17 @@ private function handleRebaseWorkspace( return; } - try { - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); - } finally { - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - } + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) + ); + + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -463,34 +456,25 @@ private function handlePublishIndividualNodesFromWorkspace( $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) ); - try { - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); - } finally { - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - } - [$matchingCommands, $remainingCommands] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToPublish); if ($matchingCommands->isEmpty()) { // almost a noop (e.g. random node ids were specified) ;) - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); return; } + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + if ($remainingCommands->isEmpty()) { // do a full publish, this is simpler for the projections to handle yield from $this->publishWorkspace( @@ -599,35 +583,26 @@ private function handleDiscardIndividualNodesFromWorkspace( $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $rebaseableCommands = RebaseableCommands::extractFromEventStream( + $this->eventStore->load( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) + ->getEventStreamName() + ) ); - try { - $rebaseableCommands = RebaseableCommands::extractFromEventStream( - $this->eventStore->load( - ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) - ->getEventStreamName() - ) - ); - - // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) - [$commandsToDiscard, $commandsToKeep] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToDiscard); - } finally { - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); - } + // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) + [$commandsToDiscard, $commandsToKeep] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToDiscard); if ($commandsToDiscard->isEmpty()) { // if we have nothing to discard, we can just keep all. (e.g. random node ids were specified) It's almost a noop ;) - yield $this->reopenContentStreamWithoutConstraints( - $workspace->currentContentStreamId - ); return; } + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + if ($commandsToKeep->isEmpty()) { // quick path everything was discarded yield from $this->discardWorkspace( From d290047f899007bf9dccc0e16153fbe3d9905d66 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:20:24 +0100 Subject: [PATCH 11/17] TASK: Add proper docs to `EventPersister` ;) --- Neos.ContentRepository.Core/Classes/ContentRepository.php | 6 +++--- .../Classes/EventStore/EventPersister.php | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 45b23d1da88..d40ebbb5060 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -103,7 +103,7 @@ public function handle(CommandInterface $command): void if ($eventsToPublish->events->isEmpty()) { return; } - $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $this->eventPersister->publishWithoutKetchup($eventsToPublish); $this->catchupProjections(); return; } @@ -116,7 +116,7 @@ public function handle(CommandInterface $command): void } $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); try { - $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $this->eventPersister->publishWithoutKetchup($eventsToPublish); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -128,7 +128,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); + $this->eventPersister->publishWithoutKetchup($yieldedErrorStrategy); } throw $concurrencyException; } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 59b102e03bd..b741b041b9c 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -25,6 +25,7 @@ public function __construct( } /** + * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 * @throws ConcurrencyException in case the expectedVersion does not match */ public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void @@ -32,14 +33,15 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl if ($eventsToPublish->events->isEmpty()) { return; } - $this->publishWithoutCatchup($eventsToPublish); + $this->publishWithoutKetchup($eventsToPublish); $contentRepository->catchUpProjections(); } /** + * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 * @throws ConcurrencyException in case the expectedVersion does not match */ - public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult + public function publishWithoutKetchup(EventsToPublish $eventsToPublish): CommitResult { $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) From 8e48e7eefdbb8910fb9352e58d08ac9dc9f07b61 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:24:49 +0100 Subject: [PATCH 12/17] TASK: Adjust naming of `removeContentStreamWithoutConstraintChecks` ;) --- .../Classes/Feature/ContentStreamHandling.php | 4 ++-- .../Feature/WorkspaceCommandHandler.php | 24 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index d71c28f4097..a228ca7c864 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -45,7 +45,7 @@ private function closeContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to reopen * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function reopenContentStreamWithoutConstraints( + private function reopenContentStreamWithoutConstraintChecks( ContentStreamId $contentStreamId, ): EventsToPublish { return new EventsToPublish( @@ -91,7 +91,7 @@ private function forkContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to remove * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function removeContentStreamWithoutConstraints( + private function removeContentStreamWithoutConstraintChecks( ContentStreamId $contentStreamId, ): EventsToPublish { return new EventsToPublish( diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index a2205df717b..020f6cbbbbe 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -240,7 +240,7 @@ static function ($handle) use ($rebaseableCommands): void { ); if ($commandSimulator->hasCommandsThatFailed()) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getCommandsThatFailed()); @@ -260,7 +260,7 @@ static function ($handle) use ($rebaseableCommands): void { ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); } catch (ConcurrencyException $concurrencyException) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); throw $concurrencyException; @@ -285,7 +285,7 @@ static function ($handle) use ($rebaseableCommands): void { ExpectedVersion::ANY() ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } private function rebaseWorkspaceWithoutChanges( @@ -312,7 +312,7 @@ private function rebaseWorkspaceWithoutChanges( ExpectedVersion::ANY() ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -403,7 +403,7 @@ static function ($handle) use ($rebaseableCommands): void { $command->rebaseErrorHandlingStrategy === RebaseErrorHandlingStrategy::STRATEGY_FAIL && $commandSimulator->hasCommandsThatFailed() ) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); @@ -434,7 +434,7 @@ static function ($handle) use ($rebaseableCommands): void { ) ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -503,7 +503,7 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ); if ($commandSimulator->hasCommandsThatFailed()) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); @@ -525,7 +525,7 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) ); } catch (ConcurrencyException $concurrencyException) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); throw $concurrencyException; @@ -555,7 +555,7 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ) ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -626,7 +626,7 @@ static function ($handle) use ($commandsToKeep): void { ); if ($commandSimulator->hasCommandsThatFailed()) { - yield $this->reopenContentStreamWithoutConstraints( + yield $this->reopenContentStreamWithoutConstraintChecks( $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringDiscard($commandSimulator->getCommandsThatFailed()); @@ -655,7 +655,7 @@ static function ($handle) use ($commandsToKeep): void { ) ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -714,7 +714,7 @@ private function discardWorkspace( ExpectedVersion::ANY() ); - yield $this->removeContentStreamWithoutConstraints($workspace->currentContentStreamId); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** From 59fa2e345742822932ef179ea90b9a70d5bc649e Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 10 Nov 2024 09:11:01 +0100 Subject: [PATCH 13/17] TASK: Improve assertions of WorkspacePublicationDuringWritingTest --- .../Tests/Parallel/AbstractParallelTestCase.php | 7 ++++++- .../WorkspacePublicationDuringWritingTest.php | 13 ++----------- .../WorkspaceWritingDuringRebaseTest.php | 1 + 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php index ef97e1bea36..569609ee5ce 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php @@ -83,6 +83,11 @@ final protected function setUpContentRepository( final protected function log(string $message): void { - file_put_contents(self::LOGGING_PATH, substr($this::class, strrpos($this::class, '\\') + 1) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND); + file_put_contents(self::LOGGING_PATH, self::shortClassName($this::class) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND); + } + + final protected static function shortClassName(string $className): string + { + return substr($className, strrpos($className, '\\') + 1); } } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php index c664404cee6..d96a9adddf0 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php @@ -223,6 +223,7 @@ public function thenConcurrentPublishLeadsToException(): void )); } catch (\Exception $thrownException) { $actualException = $thrownException; + $this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage())); } $this->log('publish finished'); @@ -231,20 +232,10 @@ public function thenConcurrentPublishLeadsToException(): void Assert::fail(sprintf('No exception was thrown')); } - if ($actualException instanceof \RuntimeException && $actualException->getCode() === 1652279016) { - // todo can be removed soon - $this->log(sprintf('got expected RuntimeException exception: %s', $actualException->getMessage())); - } elseif ($actualException instanceof ConcurrencyException) { - $this->log(sprintf('got expected ConcurrencyException exception: %s', $actualException->getMessage())); - } else { - Assert::assertInstanceOf(ConcurrencyException::class, $actualException); - } + Assert::assertInstanceOf(ConcurrencyException::class, $actualException); $this->awaitFileRemoval(self::WRITING_IS_RUNNING_FLAG_PATH); - // just to make sure were up-to-date now! - $this->contentRepository->catchupProjections(); - // writing to user works!!! try { $this->contentRepository->handle( diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php index b1022e2b6e6..73dee196a8d 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php @@ -203,6 +203,7 @@ public function thenConcurrentCommandsLeadToAnException(): void )); } catch (\Exception $thrownException) { $actualException = $thrownException; + $this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage())); } $this->log('write finished'); From 48e09cb7a3de3a1ce71e2774f121bef6d5e16b8d Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 10 Nov 2024 09:16:56 +0100 Subject: [PATCH 14/17] TASK: Assert that in WorkspaceWritingDuringRebaseTest that the workspace is still the original content stream --- .../WorkspaceWritingDuringRebaseTest.php | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php index 73dee196a8d..802205f2e16 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php @@ -160,7 +160,7 @@ public function whileAWorkspaceIsBeingRebased(): void try { $this->contentRepository->handle( RebaseWorkspace::create($workspaceName) - ->withRebasedContentStreamId(ContentStreamId::create()) + ->withRebasedContentStreamId(ContentStreamId::fromString('user-cs-rebased')) ->withErrorHandlingStrategy(RebaseErrorHandlingStrategy::STRATEGY_FORCE)); } finally { unlink(self::REBASE_IS_RUNNING_FLAG_PATH); @@ -190,6 +190,11 @@ public function thenConcurrentCommandsLeadToAnException(): void $this->log('write started'); + $workspaceDuringRebase = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test')); + Assert::assertSame('user-cs-id', $workspaceDuringRebase->getContentStreamId()->value, + 'The parallel tests expects the workspace to still point to the original cs.' + ); + $origin = OriginDimensionSpacePoint::createWithoutDimensions(); $actualException = null; try { @@ -218,7 +223,7 @@ public function thenConcurrentCommandsLeadToAnException(): void Assert::assertThat($actualException, self::logicalOr( self::isInstanceOf(ContentStreamIsClosed::class), - self::isInstanceOf(ConcurrencyException::class), + self::isInstanceOf(ConcurrencyException::class), // todo is only thrown theoretical? but not during tests here ... )); Assert::assertSame('title-original', $node?->getProperty('title')); From e12c6414aa5a89bc5835f002ef69abab4b12d76e Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 10 Nov 2024 09:29:33 +0100 Subject: [PATCH 15/17] TASK: Naming things and suggestion from code review :) --- .../Classes/ContentRepository.php | 12 ++++++------ .../Classes/EventStore/EventPersister.php | 4 ++-- .../Classes/Feature/Common/ConstraintChecks.php | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index d40ebbb5060..054639418f5 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -97,13 +97,13 @@ public function handle(CommandInterface $command): void { $toPublish = $this->commandBus->handle($command); + // simple case if ($toPublish instanceof EventsToPublish) { - // simple case - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); - if ($eventsToPublish->events->isEmpty()) { + if ($toPublish->events->isEmpty()) { return; } - $this->eventPersister->publishWithoutKetchup($eventsToPublish); + $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); $this->catchupProjections(); return; } @@ -116,7 +116,7 @@ public function handle(CommandInterface $command): void } $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); try { - $this->eventPersister->publishWithoutKetchup($eventsToPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -128,7 +128,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventPersister->publishWithoutKetchup($yieldedErrorStrategy); + $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); } throw $concurrencyException; } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index b741b041b9c..885fefc1c7f 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -33,7 +33,7 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl if ($eventsToPublish->events->isEmpty()) { return; } - $this->publishWithoutKetchup($eventsToPublish); + $this->publishWithoutCatchup($eventsToPublish); $contentRepository->catchUpProjections(); } @@ -41,7 +41,7 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 * @throws ConcurrencyException in case the expectedVersion does not match */ - public function publishWithoutKetchup(EventsToPublish $eventsToPublish): CommitResult + public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult { $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) diff --git a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php index 5364907d978..6e4ec8a7391 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php +++ b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php @@ -81,9 +81,9 @@ protected function requireContentStream( CommandHandlingDependencies $commandHandlingDependencies ): ContentStreamId { $contentStreamId = $commandHandlingDependencies->getContentGraph($workspaceName)->getContentStreamId(); - $closedState = $commandHandlingDependencies->isContentStreamClosed($contentStreamId); + $isContentStreamClosed = $commandHandlingDependencies->isContentStreamClosed($contentStreamId); - if ($closedState) { + if ($isContentStreamClosed) { throw new ContentStreamIsClosed( 'Content stream "' . $contentStreamId->value . '" is closed.', 1710260081 From f883e65f8431bafcf2dc0000d5588a381ab22581 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:29:21 +0100 Subject: [PATCH 16/17] TASK: Fix tests after bastis command test overhaul --- .../Bootstrap/GenericCommandExecutionAndEventPublication.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php index 7e150e296e1..67c91d51875 100644 --- a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php +++ b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php @@ -171,6 +171,9 @@ protected function addDefaultCommandArgumentValues(string $commandClassName, arr if (is_string($commandArguments['parentNodeAggregateId'] ?? null) && str_starts_with($commandArguments['parentNodeAggregateId'], '$')) { $commandArguments['parentNodeAggregateId'] = $this->rememberedNodeAggregateIds[substr($commandArguments['parentNodeAggregateId'], 1)]?->value; } + if (empty($commandArguments['nodeName'])) { + unset($commandArguments['nodeName']); + } } if ($commandClassName === SetNodeProperties::class) { if (is_string($commandArguments['propertyValues'] ?? null)) { From dfd457360a13aee60ae9c22d9a19c5c4b5683dcb Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Tue, 12 Nov 2024 19:13:18 +0100 Subject: [PATCH 17/17] TASK: Recorrect naming of method again (my code editor seems to misbehave :)) --- Neos.ContentRepository.Core/Classes/ContentRepository.php | 6 +++--- .../Classes/EventStore/EventPersister.php | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 443a4b3f514..efc240075ee 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -104,7 +104,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); - $this->eventPersister->publishWithoutKetchup($eventsToPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); $this->catchupProjections(); return; } @@ -114,7 +114,7 @@ public function handle(CommandInterface $command): void foreach ($toPublish as $yieldedEventsToPublish) { $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); try { - $this->eventPersister->publishWithoutKetchup($eventsToPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -126,7 +126,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventPersister->publishWithoutKetchup($yieldedErrorStrategy); + $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); } throw $concurrencyException; } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 3d3a9b018a0..1af59ff3ce9 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -30,7 +30,7 @@ public function __construct( */ public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void { - $this->publishWithoutKetchup($eventsToPublish); + $this->publishWithoutCatchup($eventsToPublish); $contentRepository->catchUpProjections(); } @@ -38,7 +38,7 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 * @throws ConcurrencyException in case the expectedVersion does not match */ - public function publishWithoutKetchup(EventsToPublish $eventsToPublish): CommitResult + public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult { $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...))