Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appended events queue issue and broken specs #1698

Merged
merged 5 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,6 @@
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.10.0-beta.1" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.11.0" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.11.1" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Cratis.Chronicle.Concepts.Events;
using Cratis.Chronicle.Concepts.Keys;
using Cratis.Chronicle.Concepts.Observation;
Expand All @@ -14,7 +15,7 @@ public abstract class a_single_subscriber : all_dependencies
{
protected ObserverKey _observerKey;
protected IObserver _observer;
protected List<HandledEvents> _handledEvents = [];
protected ConcurrentDictionary<Key, List<HandledEvents>> _handledEventsPerPartition = [];
protected AppendedEventsQueue _queue;

async Task Establish()
Expand All @@ -27,7 +28,12 @@ async Task Establish()
{
var key = callInfo.Arg<Key>();
var events = callInfo.Arg<IEnumerable<AppendedEvent>>();
_handledEvents.Add(new(key, events));
if (!_handledEventsPerPartition.TryGetValue(key, out var handledEvents))
{
handledEvents = [];
_handledEventsPerPartition.TryAdd(key, handledEvents);
}
handledEvents.Add(new(key, events));
});
_grainFactory.GetGrain<IObserver>(_observerKey).Returns(_observer);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Cratis.Chronicle.Concepts.Events;
using Cratis.Chronicle.Concepts.Keys;
using Cratis.Chronicle.Concepts.Observation;
Expand All @@ -16,8 +17,8 @@ public class two_subscribers : all_dependencies
protected ObserverKey _secondObserverKey;
protected IObserver _firstObserver;
protected IObserver _secondObserver;
protected List<HandledEvents> _firstObserverHandledEvents = [];
protected List<HandledEvents> _secondObserverHandledEvents = [];
protected ConcurrentDictionary<Key, List<HandledEvents>> _firstObserverHandledEventsPerPartition = [];
protected ConcurrentDictionary<Key, List<HandledEvents>> _secondObserverHandledEventsPerPartition = [];
protected AppendedEventsQueue _queue;

void Establish()
Expand All @@ -35,7 +36,12 @@ void Establish()
{
var key = callInfo.Arg<Key>();
var events = callInfo.Arg<IEnumerable<AppendedEvent>>();
_firstObserverHandledEvents.Add(new(key, events));
if (!_firstObserverHandledEventsPerPartition.TryGetValue(key, out var handledEvents))
{
handledEvents = [];
_firstObserverHandledEventsPerPartition.TryAdd(key, handledEvents);
}
handledEvents.Add(new(key, events));
});
_grainFactory.GetGrain<IObserver>(_firstObserverKey).Returns(_firstObserver);

Expand All @@ -47,7 +53,12 @@ void Establish()
{
var key = callInfo.Arg<Key>();
var events = callInfo.Arg<IEnumerable<AppendedEvent>>();
_secondObserverHandledEvents.Add(new(key, events));
if (!_secondObserverHandledEventsPerPartition.TryGetValue(key, out var handledEvents))
{
handledEvents = [];
_secondObserverHandledEventsPerPartition.TryAdd(key, handledEvents);
}
handledEvents.Add(new(key, events));
});
_grainFactory.GetGrain<IObserver>(_secondObserverKey).Returns(_secondObserver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_call_handle_on_first_observer_twice() => _firstObserverHandledEvents.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_first_event() => _firstObserverHandledEvents[0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_second_event() => _firstObserverHandledEvents[1].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_first_event() => _firstObserverHandledEvents[0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_second_event() => _firstObserverHandledEvents[1].Events.ShouldContainOnly(_secondAppendedEvent);
[Fact] void should_call_handle_on_second_observer_twice() => _secondObserverHandledEvents.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_first_event() => _secondObserverHandledEvents[0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_second_event() => _secondObserverHandledEvents[1].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_first_event() => _secondObserverHandledEvents[0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_second_event() => _secondObserverHandledEvents[1].Events.ShouldContainOnly(_secondAppendedEvent);
[Fact] void should_call_handle_on_first_observer_twice() => _firstObserverHandledEventsPerPartition.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_first_event() => _firstObserverHandledEventsPerPartition[_firstEventSourceId][0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_second_event() => _firstObserverHandledEventsPerPartition[_secondEventSourceId][0].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_first_event() => _firstObserverHandledEventsPerPartition[_firstEventSourceId][0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_second_event() => _firstObserverHandledEventsPerPartition[_secondEventSourceId][0].Events.ShouldContainOnly(_secondAppendedEvent);
[Fact] void should_call_handle_on_second_observer_twice() => _secondObserverHandledEventsPerPartition.Sum(_ => _.Value.Count).ShouldEqual(2);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_first_event() => _secondObserverHandledEventsPerPartition[_firstEventSourceId][0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_second_event() => _secondObserverHandledEventsPerPartition[_secondEventSourceId][0].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_first_event() => _secondObserverHandledEventsPerPartition[_firstEventSourceId][0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_second_event() => _secondObserverHandledEventsPerPartition[_secondEventSourceId][0].Events.ShouldContainOnly(_secondAppendedEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_call_handle_on_first_observer_once() => _firstObserverHandledEvents.Count.ShouldEqual(1);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_first_event() => _firstObserverHandledEvents[0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_first_event() => _firstObserverHandledEvents[0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_second_observer_once() => _secondObserverHandledEvents.Count.ShouldEqual(1);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_second_event() => _secondObserverHandledEvents[0].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_second_event() => _secondObserverHandledEvents[0].Events.ShouldContainOnly(_secondAppendedEvent);
[Fact] void should_call_handle_on_first_observer_once() => _firstObserverHandledEventsPerPartition.Sum(_ => _.Value.Count).ShouldEqual(1);
[Fact] void should_call_handle_on_first_observer_with_correct_event_source_id_for_first_event() => _firstObserverHandledEventsPerPartition[_firstEventSourceId][0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_first_observer_with_correct_event_for_first_event() => _firstObserverHandledEventsPerPartition[_firstEventSourceId][0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_second_observer_once() => _secondObserverHandledEventsPerPartition.Sum(_ => _.Value.Count).ShouldEqual(1);
[Fact] void should_call_handle_on_second_observer_with_correct_event_source_id_for_second_event() => _secondObserverHandledEventsPerPartition[_secondEventSourceId][0].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_second_observer_with_correct_event_for_second_event() => _secondObserverHandledEventsPerPartition[_secondEventSourceId][0].Events.ShouldContainOnly(_secondAppendedEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_observer_with_correct_event_source_id_for_first_event() => _handledEvents[0].Partition.Value.ShouldEqual(_firstEventSourceId.Value);
[Fact] void should_call_handle_on_observer_with_correct_event_source_id_for_second_event() => _handledEvents[1].Partition.Value.ShouldEqual(_secondEventSourceId.Value);
[Fact] void should_call_handle_on_observer_with_correct_event_for_first_event() => _handledEvents[0].Events.ShouldContainOnly(_firstAppendedEvent);
[Fact] void should_call_handle_on_observer_with_correct_event_for_second_event() => _handledEvents[1].Events.ShouldContainOnly(_secondAppendedEvent);
[Fact] void should_call_handle_on_observer_for_two_different_partitions() => _handledEventsPerPartition.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_observer_for_two_events() => _handledEventsPerPartition.Sum(_ => _.Value.Count).ShouldEqual(2);
[Fact] void should_call_handle_on_observer_with_correct_event_source_id_for_first_event() => _handledEventsPerPartition[_firstEventSourceId].SingleOrDefault(e => e.Partition == _firstEventSourceId.Value).ShouldNotBeNull();
[Fact] void should_call_handle_on_observer_with_correct_event_source_id_for_second_event() => _handledEventsPerPartition[_secondEventSourceId].SingleOrDefault(e => e.Partition == _secondEventSourceId.Value).ShouldNotBeNull();
[Fact] void should_call_handle_on_observer_with_correct_event_for_first_event() => _handledEventsPerPartition[_firstEventSourceId].SingleOrDefault(e => e.Events.SingleOrDefault(e => e == _firstAppendedEvent) is not null).ShouldNotBeNull();
[Fact] void should_call_handle_on_observer_with_correct_event_for_second_event() => _handledEventsPerPartition[_secondEventSourceId].SingleOrDefault(e => e.Events.SingleOrDefault(e => e == _secondAppendedEvent) is not null).ShouldNotBeNull();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_call_handle_on_observer_once() => _handledEvents.Count.ShouldEqual(1);
[Fact] void should_call_handle_on_observer_with_correct_event_source_id() => _handledEvents[0].Partition.Value.ShouldEqual(_eventSourceId.Value);
[Fact] void should_call_handle_on_observer_with_correct_event() => _handledEvents[0].Events.ShouldContainOnly(_appendedEvent);
[Fact] void should_call_handle_on_observer_once() => _handledEventsPerPartition[_eventSourceId].Count.ShouldEqual(1);
[Fact] void should_call_handle_on_observer_with_correct_event_source_id() => _handledEventsPerPartition[_eventSourceId][0].Partition.Value.ShouldEqual(_eventSourceId.Value);
[Fact] void should_call_handle_on_observer_with_correct_event() => _handledEventsPerPartition[_eventSourceId][0].Events.ShouldContainOnly(_appendedEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_not_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(0);
[Fact] void should_not_handle_any_events() => _handledEventsPerPartition.Count.ShouldEqual(0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ async Task Because()
await _queue.AwaitQueueDepletion();
}

[Fact] void should_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(2);
[Fact] void should_call_handle_on_observer_twice() => _handledEventsPerPartition[_eventSourceId].Count.ShouldEqual(2);
}
29 changes: 14 additions & 15 deletions Source/Kernel/Grains/EventSequences/AppendedEventsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class AppendedEventsQueue : Grain, IAppendedEventsQueue, IDisposable
readonly ConcurrentQueue<IEnumerable<AppendedEvent>> _queue = new();
readonly AsyncManualResetEvent _queueEvent = new();
readonly AsyncManualResetEvent _queueEmptyEvent = new();
readonly TaskCompletionSource _queueTaskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
Task _queueTask = Task.CompletedTask;
bool _isDisposed;
ConcurrentBag<AppendedEventsQueueObserverSubscription> _subscriptions = [];
Expand Down Expand Up @@ -101,7 +100,6 @@ public Task Unsubscribe(ObserverKey observerKey)
public void Dispose()
{
_isDisposed = true;
_queueTaskCompletionSource.SetCanceled();

if (!_queueTask.IsCompleted)
{
Expand All @@ -126,29 +124,31 @@ TaskStatus.Canceled or
/// <summary>
/// Await the queue to be depleted.
/// </summary>
/// <param name="periodNum">Optional amount of times it will check the queue.</param>
/// <param name="periodDelay">Optional time in ms it will wait after each check.</param>
/// <returns>Awaitable task.</returns>
/// <remarks>
/// This method will block until the queue is depleted. This is useful for testing purposes.
/// It is not exposed on the interface as it is not intended for production use.
/// </remarks>
public async Task AwaitQueueDepletion()
public async Task AwaitQueueDepletion(int periodNum = 10, int periodDelay = 10)
{
await Task.Run(async () =>
{
if (Debugger.IsAttached)
{
while (!_queue.IsEmpty)
{
await Task.Delay(10);
await Task.Delay(periodDelay);
}
await _queueEmptyEvent.WaitAsync();
}
else
{
var count = 10;
var count = periodNum;
while (!_queue.IsEmpty)
{
await Task.Delay(10);
await Task.Delay(periodDelay);
if (--count == 0)
{
break;
Expand All @@ -168,20 +168,17 @@ void StartQueueHandler()
}

_queueTask = _taskFactory.Run(QueueHandler);

// The queue task should never stop, then the queue will be stopped. We restart it if it stops. A sort of "watch dog".
_queueTask.ContinueWith(_ => StartQueueHandler(), TaskScheduler.Default);
}

async Task QueueHandler()
{
while (!_queueTaskCompletionSource.Task.IsCanceled)
while (!_isDisposed)
{
try
{
await _queueEvent.WaitAsync();
_queueEmptyEvent.Reset();
if (_queueTaskCompletionSource.Task.IsCanceled)
if (_isDisposed)
{
return;
}
Expand Down Expand Up @@ -220,19 +217,21 @@ async Task QueueHandler()
_logger.QueueHandlerFailed(exception);
}
}
StartQueueHandler();
}

async Task HandleSingle(IEnumerable<AppendedEvent> events)
{
var @event = events.First();
foreach (var subscription in _subscriptions)
{
if (subscription.EventTypeIds.Contains(@event.Metadata.Type.Id))
if (!subscription.EventTypeIds.Contains(@event.Metadata.Type.Id))
{
var observer = _grainFactory.GetGrain<IObserver>(subscription.ObserverKey);
var eventToHandle = new List<AppendedEvent> { @event };
await observer.Handle(@event.Context.EventSourceId, eventToHandle);
continue;
}
var observer = _grainFactory.GetGrain<IObserver>(subscription.ObserverKey);
var eventToHandle = new List<AppendedEvent> { @event };
await observer.Handle(@event.Context.EventSourceId, eventToHandle);
}
}

Expand Down
5 changes: 5 additions & 0 deletions Source/Kernel/Grains/Observation/Observer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ public async Task Handle(Key partition, IEnumerable<AppendedEvent> events)
{
using var scope = logger.BeginObserverScope(_observerId, _observerKey);

if (!events.Any())
{
return;
}

if (!ShouldHandleEvent(partition))
{
return;
Expand Down
Loading