Skip to content

Commit

Permalink
Introducing Replayed method for observer and calling this from the Re…
Browse files Browse the repository at this point in the history
…playObserver job
  • Loading branch information
einari committed Jan 16, 2025
1 parent b9abf68 commit 5bb662e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
7 changes: 7 additions & 0 deletions Source/Kernel/Grains.Interfaces/Observation/IObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ Task Subscribe<TObserverSubscriber>(
/// <returns>Awaitable task.</returns>
Task ReplayPartitionTo(Key partition, EventSequenceNumber sequenceNumber);

/// <summary>
/// Notify that the observer has been replayed.
/// </summary>
/// <param name="lastHandledEventSequenceNumber">The <see cref="EventSequenceNumber"/> it has been replayed to.</param>
/// <returns>Awaitable task.</returns>
Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber);

/// <summary>
/// Notify that the partition has been replayed.
/// </summary>
Expand Down
15 changes: 13 additions & 2 deletions Source/Kernel/Grains/Observation/Jobs/ReplayObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Cratis.Chronicle.Concepts.Events;
using Cratis.Chronicle.Concepts.Jobs;
using Cratis.Chronicle.Grains.Jobs;
using Cratis.Chronicle.Grains.Observation.States;
using Cratis.Chronicle.Storage;
using Microsoft.Extensions.Logging;

Expand All @@ -25,8 +24,20 @@ public class ReplayObserver(IStorage storage, ILogger<ReplayObserver> logger) :
public override async Task OnCompleted()
{
using var scope = logger.BeginJobScope(JobId, JobKey);
if (!AllStepsCompletedSuccessfully)
{
if (State.LastHandledEventSequenceNumber.IsActualValue)
{
logger.NotAllEventsWereHandled(nameof(CatchUpObserver), State.LastHandledEventSequenceNumber);
}
else
{
logger.NoneEventsWereHandled(nameof(CatchUpObserver));
}
}

var observer = GrainFactory.GetGrain<IObserver>(Request.ObserverKey);
await observer.TransitionTo<Routing>();
await observer.Replayed(State.LastHandledEventSequenceNumber);
}

/// <inheritdoc/>
Expand Down
9 changes: 9 additions & 0 deletions Source/Kernel/Grains/Observation/Observer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ await _jobsManager.Start<IReplayObserverPartition, ReplayObserverPartitionReques
await WriteStateAsync();
}

/// <inheritdoc/>
public async Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber)
{
using var scope = logger.BeginObserverScope(_observerId, _observerKey);
HandleNewLastHandledEvent(lastHandledEventSequenceNumber);
await WriteStateAsync();
await TransitionTo<Routing>();
}

/// <inheritdoc/>
public async Task PartitionReplayed(Key partition, EventSequenceNumber lastHandledEventSequenceNumber)
{
Expand Down

0 comments on commit 5bb662e

Please sign in to comment.