Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Subscription Engine #5321

Open
wants to merge 158 commits into
base: 9.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
158 commits
Select commit Hold shift + click to select a range
2172f09
WIP
bwaidelich Oct 24, 2024
13b1073
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Oct 24, 2024
d75f174
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Oct 24, 2024
937f84d
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Oct 26, 2024
03fb7ca
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Oct 29, 2024
dcb4be2
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Nov 3, 2024
fafe080
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Nov 5, 2024
29ba908
WIP
bwaidelich Nov 5, 2024
207179b
Tweak type comments
bwaidelich Nov 6, 2024
a127706
Wiring...
bwaidelich Nov 13, 2024
765b84f
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Nov 13, 2024
5b035c1
first (almost) working version
bwaidelich Nov 14, 2024
aca0f9b
Fix CatchUpHooks
bwaidelich Nov 14, 2024
7fea53e
Replace ProjectionService
bwaidelich Nov 14, 2024
ec08d64
remove `EventPersister` and much more :)
bwaidelich Nov 14, 2024
9fef068
Remove `RunSubscriptionEventStore`
bwaidelich Nov 14, 2024
46ad9d1
Improve error handling (WIP)
bwaidelich Nov 14, 2024
5ea8fae
Merge branch '9.0' into feature/4746-rework-catchup-mechanism-3
bwaidelich Nov 17, 2024
88cc600
Fix `ContentRepositoryFactory` constructor
bwaidelich Nov 17, 2024
e3f85ac
Improve error handling during `SubscriptionEngine::setup()`
bwaidelich Nov 17, 2024
c43a2ea
Allow to reset subscriptions
bwaidelich Nov 18, 2024
40e8d35
Re-implement `cr:status` CLI command
bwaidelich Nov 18, 2024
3744fd5
Fix `test_parallel` cr settings
bwaidelich Nov 18, 2024
c0fbfe1
Fix `test_parallel` cr settings 2/2
bwaidelich Nov 18, 2024
f5ff7d6
Tweak subscription engine setup/reset from tests
bwaidelich Nov 18, 2024
f8a8b5b
Fix behat tests ?
bwaidelich Nov 18, 2024
1562435
Disable `EventExportProcessor` test
bwaidelich Nov 18, 2024
2902dc2
Revert "Disable `EventExportProcessor` test"
bwaidelich Nov 18, 2024
0fe05b5
TASK: Fix phpstan
mhsdesign Nov 20, 2024
6e5b565
Merge remote-tracking branch 'origin/9.0' into feature/4746-rework-ca…
mhsdesign Nov 20, 2024
8967ead
TASK: Add sanity check assertion after setting node properties a coup…
mhsdesign Nov 20, 2024
2b5d338
TASK: Introduce dedicated `contentRepositoryLogger`
mhsdesign Nov 20, 2024
2d3a136
Fix parallel tests and publish events on correct stream
mhsdesign Nov 20, 2024
46dc510
TASK: Simplify `normalizeEvents`
mhsdesign Nov 20, 2024
0680533
Merge remote-tracking branch 'origin/9.0' into feature/4746-rework-ca…
mhsdesign Nov 20, 2024
de92d79
TASK: Test that locking and concurrent writing works under heavy load
mhsdesign Nov 20, 2024
f58cefa
TASK: Improve exception thrown if subscriber failed
mhsdesign Nov 20, 2024
e15ae79
TASK: Fix of-by-one error in catchup
mhsdesign Nov 20, 2024
c7df820
TASK: Improve logging and simplify information on debug
mhsdesign Nov 20, 2024
0bd129a
TASK: Remove obsolete ContentRepositorySubscribersFactoryInterface
mhsdesign Nov 20, 2024
1dfaa2c
TASK: Remove generic `EventHandlerInterface` for now, everything is a…
mhsdesign Nov 20, 2024
441b035
TASK: Remove other generic subscription concepts not required for pro…
mhsdesign Nov 20, 2024
9c6ba75
TASK: Remove subscription groups and filtering except for status and ids
mhsdesign Nov 21, 2024
0ac8751
TASK: Remove `$skipBooting` because its an odd signature and unused
mhsdesign Nov 20, 2024
9be3308
TASK: Remove removal of subscriptions
mhsdesign Nov 20, 2024
53790b9
TASK: Remove sqlite support for `DoctrineSubscriptionStore` as db loc…
mhsdesign Nov 20, 2024
481f173
TASK: Inline `ProjectionEventHandler` to `Subscriber` and make it a `…
mhsdesign Nov 21, 2024
b9569c5
TASK: Introduce tests for subscription booting, active and error state
mhsdesign Nov 21, 2024
427829e
TASK: Allow to replace setting of cr registry via `injectSettings`
mhsdesign Nov 21, 2024
715ec2e
TASK: Subscription engine test new and detached status
mhsdesign Nov 21, 2024
512e3c4
TASK: Speedup tests by using truncate
mhsdesign Nov 21, 2024
408ceb2
TASK: Subscription engine test filtering by subscription id
mhsdesign Nov 21, 2024
b71165c
TASK: Subscription engine test `filteringReset`
mhsdesign Nov 22, 2024
c8f8c6a
TASK: introduce `DebugEventProjection` for testing to assert each eve…
mhsdesign Nov 22, 2024
91046e6
TASK: introduce test that projection is rollback'd in case of error
mhsdesign Nov 22, 2024
f624ac6
TASK: test catchup hooks on failure
mhsdesign Nov 22, 2024
0197017
TASK: Introduce test to assert that projection keeps events previousl…
mhsdesign Nov 23, 2024
3b6ca26
TASK: Improve catchup rollback test
mhsdesign Nov 23, 2024
794be11
TASK: Split up mighty `SubscriptionEngineTest`
mhsdesign Nov 23, 2024
752d434
TASK: Simplify AbstractSubscriptionEngineTestCase by moving out speci…
mhsdesign Nov 23, 2024
c4da7fe
TASK: Test error behaviour for onBeforeCatchUp and onAfterCatchUp
mhsdesign Nov 23, 2024
c8cb0a2
TASK: Add test for happy catchup hooks
mhsdesign Nov 23, 2024
dfe5eb3
TASK: Adjust subscription test exceptions to do no retry
mhsdesign Nov 23, 2024
a32bfb2
TASK: Remove `retry_attempt` from subscriptions
mhsdesign Nov 23, 2024
836c347
TASK: Rename factory back to `$additionalSubscriberFactories`
mhsdesign Nov 23, 2024
830b5cc
BUGFIX: `discoverDetachedSubscriptions` did not persist changes
mhsdesign Nov 23, 2024
3d43183
Merge pull request #5375 from mhsdesign/task/radical-cleanup-for-subs…
mhsdesign Nov 23, 2024
6f43825
TASK: Fix phpstan
mhsdesign Nov 23, 2024
60d4f8c
BUGFIX: Reintroduce catchup hooks for all projections
mhsdesign Nov 23, 2024
5ef4ab1
TASK: Adjust to doctrine deprecations in DoctrineSubscriptionStore
mhsdesign Nov 23, 2024
c7cb75b
TASK: Minor code adjustments
mhsdesign Nov 23, 2024
a80639b
TASK: Ensure that the content graph projection is not part of the gen…
mhsdesign Nov 23, 2024
e7acfaa
BUGFIX: ProjectionErrorTest::fixFailedProjection reset error on reset
mhsdesign Nov 23, 2024
fdeec75
TASK: Assertions that setup and boot do not retry failed projections
mhsdesign Nov 23, 2024
31913f5
BUGFIX: SubscriptionDetachedStatusTest::projectionIsDetachedIfConfigu…
mhsdesign Nov 23, 2024
c5ea757
TASK: Throw `CatchUpFailed` exception in case onBeforeCatchUp or onAf…
mhsdesign Nov 23, 2024
794eaf2
TASK: Use save points to rollback projections during transaction on f…
mhsdesign Nov 23, 2024
7852f61
TASK: Handle `TableNotFoundException` gracefully in `subscriptionStat…
mhsdesign Nov 23, 2024
d582766
TASK: Move back to `subscriptionStatuses` test. We need to make sure …
mhsdesign Nov 23, 2024
5dfa592
TASK: Inline `discoverDetachedSubscriptions`
mhsdesign Nov 23, 2024
d5715c7
TASK: Do not discover new subscriptions during catchup
mhsdesign Nov 23, 2024
37a4e47
TASK: Introduce further tests to assert behaviour for catchup and setup
mhsdesign Nov 24, 2024
a8f246b
BUGFIX: Setup must re-setup active projections for migrations
mhsdesign Nov 24, 2024
73e1097
BUGFIX: Setup should reattach detached projections if possible, and m…
mhsdesign Nov 24, 2024
6726d73
FEATURE: Setup marks failed projections to be booted again
mhsdesign Nov 24, 2024
f46077e
FEATURE: Introduce `ContentRepositoryMaintainer` and restore cr:proje…
mhsdesign Nov 24, 2024
bace8ff
TASK: Rename `ProjectionStatus` and introduce `ProjectionSubscription…
mhsdesign Nov 24, 2024
8ff0f61
TASK: Introduce `DetachedSubscriptionStatus` as the projection setup …
mhsdesign Nov 24, 2024
9675572
TASK: Inline `pruneAllWorkspacesAndContentStreamsFromEventStream` int…
mhsdesign Nov 24, 2024
655ac3c
TASK: Reimplement 40e8d35e09ee690406c6a9cfc823c775d4ee3b51
mhsdesign Nov 24, 2024
e235e69
TASK: Document new `ContentRepositoryMaintainer`
mhsdesign Nov 24, 2024
611ca37
TASK: Rename `ProjectionSetupStatus` back to `ProjectionStatus`
mhsdesign Nov 25, 2024
0b8a3b5
TASK: Rename `SubscriptionStatuses` to `SubscriptionStatusCollection`
mhsdesign Nov 25, 2024
51f0cf6
TASK: Leave warning hint for why we do a replay
mhsdesign Nov 25, 2024
a2a2411
TASK: Warn in `catchupProjection` if projection is not ready to be ca…
mhsdesign Nov 25, 2024
63d1589
TASK: Document `catchupProjection` correctly
mhsdesign Nov 25, 2024
2297c14
TASK: Reintroduce `ContentRepositoryStatus` object and expose current…
mhsdesign Nov 25, 2024
1220f82
TASK: Swap Projection and Setup in output so that Setup comes first
mhsdesign Nov 25, 2024
800fd53
WIP: Introduce `cr:reactivateSubscription`
mhsdesign Nov 26, 2024
a0c9f90
TASK: Dont crash on status when the event store is not setup
mhsdesign Nov 27, 2024
8c079d9
TASK: Split projection replay into separate SubscriptionCommandContro…
mhsdesign Nov 27, 2024
4c65d81
TASK: Status also shows new subscriptions even if they are not persis…
mhsdesign Nov 27, 2024
8c9c0e8
TASK: Refine todos
mhsdesign Nov 27, 2024
4424483
TASK: Declare SubscriptionEngine and friends as internal
mhsdesign Nov 27, 2024
baa5e4a
TASK: Add error code to `SubscriptionEngineAlreadyProcessingException`
mhsdesign Nov 27, 2024
66e54bc
TASK: Allow cr registry to implement internal subscription store beca…
mhsdesign Nov 27, 2024
51d39e5
TASK: Rename to `SubscriptionReplayProcessor`
mhsdesign Nov 27, 2024
b2c1a29
TASK: Improve legacy projectionReplayCommand stub
mhsdesign Nov 27, 2024
d84c2a4
TASK: Add test for Subscription & Cr Commands (and thus CRMaintainer)
mhsdesign Nov 27, 2024
fd768da
Merge pull request #5378 from mhsdesign/feature/content-repository-ma…
mhsdesign Nov 27, 2024
ac425ff
TASK: Remove `SubscriptionManager` and make subscriptions immutable
mhsdesign Nov 27, 2024
eb0d792
FEATURE: Implement `reactivateSubscription`
mhsdesign Nov 27, 2024
dc5ff10
TASK: Move transactional logic _on_ projection as it does not belong …
mhsdesign Nov 27, 2024
3448e21
SubscriptionEngineTest postgresql compatible
kitsunet Dec 1, 2024
5abdb0a
Add missing empty string check
kitsunet Dec 1, 2024
2b82f62
TASK: Declare `EventNormalizer` as internal
mhsdesign Dec 2, 2024
4353f0f
TASK: Simplify `SubscriberFactoryDependencies` to only contain api th…
mhsdesign Dec 2, 2024
ea3eaa6
TASK: Introduce `getPropertyConverter` to denote that this is really …
mhsdesign Dec 2, 2024
e750d93
TASK: Migrate checkpoints to subscriptions via `migrateevents:migrate…
mhsdesign Dec 2, 2024
33b717a
TASK: Disable running subscription test on PostgreSQL
mhsdesign Dec 2, 2024
9286afc
TASK: Remove obsolete todo
mhsdesign Dec 2, 2024
d13c153
Merge remote-tracking branch 'origin/9.0' into feature/4746-rework-ca…
mhsdesign Dec 2, 2024
34fd834
TASK: Dont use `new EventNormalizer()` in tests
mhsdesign Dec 2, 2024
caa70bf
TASK: Fix php cs
mhsdesign Dec 3, 2024
54b24b8
BUGFIX: Ensure `onAfterCatchUp` is always executed _after_ the projec…
mhsdesign Dec 2, 2024
4a7b058
TASK: Reactivate `ParallelWritingInWorkspacesTest`
mhsdesign Dec 3, 2024
c47d181
TASK: Test `ProjectionTransactionTrait` when using external projections
mhsdesign Dec 2, 2024
fd9faa4
TASK: Implement that `onAfterCatchUp` called _after_ everything is pe…
mhsdesign Dec 3, 2024
175ab4c
Move savepoint creation back on the subscription store
mhsdesign Dec 3, 2024
582c5e6
TASK: Adjust tests that exactly once delivery is not possible for ext…
mhsdesign Dec 3, 2024
bfb4655
TASK: Change that hooks are not executed in the same savepoint and do…
mhsdesign Dec 3, 2024
aa2e7b1
TASK: Prevent catchup hooks from halting the projections
mhsdesign Dec 3, 2024
bccea53
TASK: Introduce dedicated `CatchUpHadErrors` exception
mhsdesign Dec 3, 2024
4c41482
TASK: Introduce test to assert behaviour when catchup hooks use the p…
mhsdesign Dec 4, 2024
21318b2
TASK: Explain behaviour when handling multiple commands
mhsdesign Dec 7, 2024
47fc20b
FEATURE: Introduce batching in subscription engine
mhsdesign Dec 7, 2024
6b59fdb
FEATURE: Introduce `onAfterBatchCompleted` hook
mhsdesign Dec 7, 2024
d04b8f3
TASK: Trivial cosmetic changes
mhsdesign Dec 9, 2024
b852617
TASK: Update documentation of `CatchUpHookInterface`
mhsdesign Dec 9, 2024
8daa836
TASK: Introduce test that we cannot roll-back setup
mhsdesign Dec 10, 2024
8f55975
TASK: Remove use of save-points for projections
mhsdesign Dec 10, 2024
79d4ec7
Fix `DoctrineSubscriptionStore` compatibility with SQLite and PostgreSQL
bwaidelich Dec 10, 2024
6423dc6
Merge pull request #5392 from mhsdesign/task/subscription-engine-save…
mhsdesign Dec 11, 2024
42edc02
Merge remote-tracking branch 'origin/9.0' into feature/4746-rework-ca…
mhsdesign Dec 11, 2024
c3e3291
TASK: Improve errors for catch hooks to only the first error the exce…
mhsdesign Dec 13, 2024
542bd4e
TASK: Replace `transactional` function use with explicit start and stop
mhsdesign Dec 13, 2024
efa4165
TASK: Ensure that subscriptions are not catchup'd if "onBeforeCatchUp…
mhsdesign Dec 13, 2024
202519c
TASK: Add test to ensure transaction is not active during onAfterBatc…
mhsdesign Dec 13, 2024
cc299c3
TASK: Only collect first Throwable object instance during catchup
mhsdesign Dec 13, 2024
431732e
TASK: Improve message of `CatchUpHadErrors` by concatenating all erro…
mhsdesign Dec 13, 2024
3fa4122
TASK: Log catchup hook errors directly and improve error output durin…
mhsdesign Dec 13, 2024
2e76381
TASK: Add position of error to catchup error for better debug informa…
mhsdesign Dec 13, 2024
cd384e5
TASK: Credit patchlevel in SubscriptionEngine
mhsdesign Dec 13, 2024
4d65f9c
Merge remote-tracking branch 'origin/9.0' into feature/4746-rework-ca…
mhsdesign Dec 13, 2024
d53c361
TASK: Adjust commitOnConnection_onAfterEvent test to batching
mhsdesign Dec 13, 2024
24890b9
TASK: Ensure status is serialized by value
mhsdesign Dec 13, 2024
ca79907
BUGFIX: Ensure that replay does not reset new or detached projections
mhsdesign Dec 13, 2024
dc97232
TASK: Make phpcs happy
mhsdesign Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\DimensionSpacePointsRepository;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\NodeFactory;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ProjectionContentGraph;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Factory\SubscriberFactoryDependencies;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionFactoryInterface;

/**
Expand All @@ -24,8 +24,7 @@ public function __construct(
}

public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
SubscriberFactoryDependencies $projectionFactoryDependencies,
): DoctrineDbalContentGraphProjection {
$tableNames = ContentGraphTableNames::create(
$projectionFactoryDependencies->contentRepositoryId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Doctrine\DBAL\Connection;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\HypergraphProjection;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Repository\NodeFactory;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Factory\SubscriberFactoryDependencies;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionFactoryInterface;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

Expand All @@ -28,8 +28,7 @@ public static function graphProjectionTableNamePrefix(
}

public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
SubscriberFactoryDependencies $projectionFactoryDependencies,
): HypergraphProjection {
$tableNamePrefix = self::graphProjectionTableNamePrefix(
$projectionFactoryDependencies->contentRepositoryId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntries;
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntryType;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Annotations as Flow;

Expand Down Expand Up @@ -107,7 +108,7 @@ final class RaceTrackerCatchUpHook implements CatchUpHookInterface
protected $configuration;
private bool $inCriticalSection = false;

public function onBeforeCatchUp(): void
public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void
{
RedisInterleavingLogger::connect($this->configuration['redis']['host'], $this->configuration['redis']['port']);
}
Expand All @@ -126,16 +127,12 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event
{
}

public function onBeforeBatchCompleted(): void
public function onAfterCatchUp(): void
{
// we only want to track relevant lock release calls (i.e. if we were in the event processing loop before)
if ($this->inCriticalSection) {
$this->inCriticalSection = false;
RedisInterleavingLogger::trace(TraceEntryType::LockWillBeReleasedIfItWasAcquiredBefore);
}
}

public function onAfterCatchUp(): void
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

namespace Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester;

use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryDependencies;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookFactoryDependencies;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;

/**
Expand Down
236 changes: 98 additions & 138 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@
use Neos\ContentRepository\Core\CommandHandler\CommandInterface;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\InitiatingEventMetadata;
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CatchUp;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryDependencies;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatuses;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStates;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryStatus;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStream;
Expand All @@ -45,45 +36,34 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Psr\Clock\ClockInterface;

/**
* Main Entry Point to the system. Encapsulates the full event-sourced Content Repository.
*
* Use this to:
* - set up the necessary database tables and contents via {@see ContentRepository::setUp()}
* - send commands to the system (to mutate state) via {@see ContentRepository::handle()}
* - access projection state (to read state) via {@see ContentRepository::projectionState()}
* - catch up projections via {@see ContentRepository::catchUpProjection()}
* - send commands to the system (to mutate state) via {@see self::handle()}
* - access the content graph read model
* - access 3rd party read models via {@see self::projectionState()}
*
* @api
*/
final class ContentRepository
final readonly class ContentRepository
{
/**
* @var array<class-string<ProjectionStateInterface>, ProjectionStateInterface>
*/
private array $projectionStateCache;

/**
* @internal use the {@see ContentRepositoryFactory::getOrBuild()} to instantiate
*/
public function __construct(
public readonly ContentRepositoryId $id,
private readonly CommandBus $commandBus,
private readonly EventStoreInterface $eventStore,
private readonly ProjectionsAndCatchUpHooks $projectionsAndCatchUpHooks,
private readonly EventNormalizer $eventNormalizer,
private readonly EventPersister $eventPersister,
private readonly NodeTypeManager $nodeTypeManager,
private readonly InterDimensionalVariationGraph $variationGraph,
private readonly ContentDimensionSourceInterface $contentDimensionSource,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly ContentGraphReadModelInterface $contentGraphReadModel
private readonly ContentGraphReadModelInterface $contentGraphReadModel,
private readonly ProjectionStates $projectionStates,
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
) {
}

Expand Down Expand Up @@ -118,120 +98,100 @@ public function handle(CommandInterface $command): void
*/
public function projectionState(string $projectionStateClassName): ProjectionStateInterface
{
if (!isset($this->projectionStateCache)) {
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
if ($projection instanceof ContentGraphProjectionInterface) {
continue;
}
$projectionState = $projection->getState();
$this->projectionStateCache[$projectionState::class] = $projectionState;
}
}
if (isset($this->projectionStateCache[$projectionStateClassName])) {
/** @var T $projectionState */
$projectionState = $this->projectionStateCache[$projectionStateClassName];
return $projectionState;
}
if (in_array(ContentGraphReadModelInterface::class, class_implements($projectionStateClassName), true)) {
throw new \InvalidArgumentException(sprintf('Accessing the internal content repository projection state via %s(%s) is not allowed. Please use the API on the content repository instead.', __FUNCTION__, $projectionStateClassName), 1729338679);
}

throw new \InvalidArgumentException(sprintf('A projection state of type "%s" is not registered in this content repository instance.', $projectionStateClassName), 1662033650);
}

/**
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
*/
public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);

$catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
$catchUpHook = $catchUpHookFactory?->build(new CatchUpHookFactoryDependencies(
$this->id,
$projection->getState(),
$this->nodeTypeManager,
$this->contentDimensionSource,
$this->variationGraph
));

// TODO allow custom stream name per projection
$streamName = VirtualStreamName::all();
$eventStream = $this->eventStore->load($streamName);
if ($options->maximumSequenceNumber !== null) {
$eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
}

$eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);
if ($options->progressCallback !== null) {
($options->progressCallback)($event, $eventEnvelope);
}
if (!$projection->canHandle($event)) {
return;
}
$catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$projection->apply($event, $eventEnvelope);
if ($projection instanceof WithMarkStaleInterface) {
$projection->markStale();
}
$catchUpHook?->onAfterEvent($event, $eventEnvelope);
};

$catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());

if ($catchUpHook !== null) {
$catchUpHook->onBeforeCatchUp();
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
try {
return $this->projectionStates->get($projectionStateClassName);
} catch (\InvalidArgumentException $e) {
throw new \InvalidArgumentException(sprintf('A projection state of type "%s" is not registered in this content repository instance: %s', $projectionStateClassName, $e->getMessage()), 1662033650, $e);
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
}

public function catchupProjections(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// FIXME optimise by only loading required events once and not per projection
// see https://github.com/neos/neos-development-collection/pull/4988/
$this->catchUpProjection($projection::class, CatchUpOptions::create());
}
}

public function setUp(): void
{
$this->eventStore->setup();
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->setUp();
}
}

public function status(): ContentRepositoryStatus
{
$projectionStatuses = ProjectionStatuses::createEmpty();
foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
$projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
}
return new ContentRepositoryStatus(
$this->eventStore->status(),
$projectionStatuses,
);
}

public function resetProjectionStates(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->reset();
}
}

/**
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
*/
public function resetProjectionState(string $projectionClassName): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
$projection->reset();
}
// /**
// * @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
// */
// public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
// {
// $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
//
// $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
// $catchUpHook = $catchUpHookFactory?->build($this);
//
// // TODO allow custom stream name per projection
// $streamName = VirtualStreamName::all();
// $eventStream = $this->eventStore->load($streamName);
// if ($options->maximumSequenceNumber !== null) {
// $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
// }
//
// $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
// $event = $this->eventNormalizer->denormalize($eventEnvelope->event);
// if ($options->progressCallback !== null) {
// ($options->progressCallback)($event, $eventEnvelope);
// }
// if (!$projection->canHandle($event)) {
// return;
// }
// $catchUpHook?->onBeforeEvent($event, $eventEnvelope);
// $projection->apply($event, $eventEnvelope);
// if ($projection instanceof WithMarkStaleInterface) {
// $projection->markStale();
// }
// $catchUpHook?->onAfterEvent($event, $eventEnvelope);
// };
//
// $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());
//
// if ($catchUpHook !== null) {
// $catchUpHook->onBeforeCatchUp();
// $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
// }
// $catchUp->run($eventStream);
// $catchUpHook?->onAfterCatchUp();
// }

// public function catchupProjections(): void
// {
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// // FIXME optimise by only loading required events once and not per projection
// // see https://github.com/neos/neos-development-collection/pull/4988/
// $this->catchUpProjection($projection::class, CatchUpOptions::create());
// }
// }

// public function setUp(): void
// {
// $this->eventStore->setup();
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// $projection->setUp();
// }
// }

// public function status(): ContentRepositoryStatus
// {
// $projectionStatuses = ProjectionStatuses::createEmpty();
// foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
// $projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
// }
// return new ContentRepositoryStatus(
// $this->eventStore->status(),
// $projectionStatuses,
// );
// }

// public function resetProjectionStates(): void
// {
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// $projection->reset();
// }
// }

// /**
// * @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
// */
// public function resetProjectionState(string $projectionClassName): void
// {
// $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
// $projection->reset();
// }

/**
* @throws WorkspaceDoesNotExist if the workspace does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,5 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl
$normalizedEvents,
$eventsToPublish->expectedVersion
);

$contentRepository->catchUpProjections();
}
}
Loading
Loading