Skip to content

Commit

Permalink
Making sure we are not processing events while waiting for catchup to…
Browse files Browse the repository at this point in the history
… start
  • Loading branch information
einari committed Jan 22, 2025
1 parent 6776f29 commit 3b780ed
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 1 deletion.
9 changes: 9 additions & 0 deletions Source/Kernel/Grains.Interfaces/Observation/IObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Cratis.Chronicle.Concepts.Keys;
using Cratis.Chronicle.Concepts.Observation;
using Cratis.Chronicle.Storage.Observation;
using Orleans.Concurrency;

namespace Cratis.Chronicle.Grains.Observation;

Expand Down Expand Up @@ -141,6 +142,14 @@ Task Subscribe<TObserverSubscriber>(
/// <returns>Awaitable task.</returns>
Task CatchUp();

/// <summary>
/// Register partitions that the observer is catching up.
/// </summary>
/// <param name="partitions">Collection of <see cref="Key">partitions</see>.</param>
/// <returns>Awaitable task.</returns>
[AlwaysInterleave]
Task RegisterCatchingUpPartitions(IEnumerable<Key> partitions);

/// <summary>
/// Notify that the observer has been caught up.
/// </summary>
Expand Down
7 changes: 6 additions & 1 deletion Source/Kernel/Grains/Observation/Jobs/CatchUpObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Immutable;
using Cratis.Chronicle.Concepts.Events;
using Cratis.Chronicle.Concepts.Jobs;
using Cratis.Chronicle.Concepts.Keys;
using Cratis.Chronicle.Grains.Jobs;
using Cratis.Chronicle.Storage;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -62,10 +63,10 @@ protected override async Task<IImmutableList<JobStepDetails>> PrepareSteps(Catch
{
var observerKeyIndexes = storage.GetEventStore(JobKey.EventStore).GetNamespace(JobKey.Namespace).ObserverKeyIndexes;
var index = await observerKeyIndexes.GetFor(request.ObserverKey);

var keys = index.GetKeys(request.FromEventSequenceNumber);

var steps = new List<JobStepDetails>();
var keysForSteps = new List<Key>();

await foreach (var key in keys)
{
Expand All @@ -78,8 +79,12 @@ protected override async Task<IImmutableList<JobStepDetails>> PrepareSteps(Catch
EventSequenceNumber.Max,
EventObservationState.None,
request.EventTypes)));
keysForSteps.Add(key);
}

var observer = GrainFactory.GetGrain<IObserver>(Request.ObserverKey);
await observer.RegisterCatchingUpPartitions(keysForSteps);

return steps.ToImmutableList();
}
}
14 changes: 14 additions & 0 deletions Source/Kernel/Grains/Observation/Observer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public async Task FailedPartitionPartiallyRecovered(Key partition, EventSequence
public async Task CatchUp()
{
using var scope = logger.BeginObserverScope(State.Id, _observerKey);

var subscription = await GetSubscription();

var jobs = await _jobsManager.GetJobsOfType<ICatchUpObserver, CatchUpObserverRequest>();
Expand Down Expand Up @@ -311,6 +312,19 @@ await _jobsManager.Start<ICatchUpObserver, CatchUpObserverRequest>(
}
}

/// <inheritdoc/>
public async Task RegisterCatchingUpPartitions(IEnumerable<Key> partitions)
{
using var scope = logger.BeginObserverScope(State.Id, _observerKey);
logger.RegisteringCatchingUpPartitions();
foreach (var partition in partitions)
{
State.CatchingUpPartitions.Add(partition);
}

await WriteStateAsync();
}

/// <inheritdoc/>
public async Task CaughtUp(EventSequenceNumber lastHandledEventSequenceNumber)
{
Expand Down
3 changes: 3 additions & 0 deletions Source/Kernel/Grains/Observation/ObserverLogging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ internal static partial class ObserverLogMessages

[LoggerMessage(LogLevel.Trace, "Start new catch up job from event sequence number {EventSequenceNumber}")]
internal static partial void StartCatchUpJob(this ILogger<Observer> logger, EventSequenceNumber eventSequenceNumber);

[LoggerMessage(LogLevel.Trace, "Registering partitions that are catching up")]
internal static partial void RegisteringCatchingUpPartitions(this ILogger<Observer> logger);
}

internal static class ObserverScopes
Expand Down

0 comments on commit 3b780ed

Please sign in to comment.