Skip to content

Commit

Permalink
Threating every activation of the observer uniquely
Browse files Browse the repository at this point in the history
  • Loading branch information
einari committed Dec 9, 2024
1 parent 6acbdc1 commit 25b5349
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.given;

public class a_reactor_observing_an_event_that_can_fail(GlobalFixture globalFixture) : IntegrationSpecificationContext(globalFixture)
public class a_reactor_observing_an_event_that_can_fail(GlobalFixture globalFixture, int numberOfObservations) : IntegrationSpecificationContext(globalFixture)
{
public TaskCompletionSource Tcs;
public ReactorThatCanFail Observer;
public TaskCompletionSource[] Tcs;
public ReactorThatCanFail[] Observers;
public IObserver ReactorObserver;
public override IEnumerable<Type> Reactors => [typeof(ReactorThatCanFail)];
public override IEnumerable<Type> EventTypes => [typeof(SomeEvent)];
public Concepts.Observation.ObserverId ObserverId;
int _activationCount;

protected override void ConfigureServices(IServiceCollection services)
{
Tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
Observer = new ReactorThatCanFail(Tcs);
services.AddSingleton(Observer);
Tcs = Enumerable.Range(0, numberOfObservations).Select(_ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)).ToArray();
Observers = Enumerable.Range(0, numberOfObservations).Select(index => new ReactorThatCanFail(Tcs[index])).ToArray();
services.AddTransient(_ => Observers[_activationCount++]);
}

async Task Establish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_handl
[Collection(GlobalCollection.Name)]
public class and_it_fails_first_time_but_not_second_time(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_reactor_observing_an_event_that_can_fail(globalFixture)
public class context(GlobalFixture globalFixture) : given.a_reactor_observing_an_event_that_can_fail(globalFixture, 2)
{
public IEnumerable<FailedPartition> FailedPartitionsBeforeRetry;
public IEnumerable<FailedPartition> FailedPartitionsAfterRetry;
Expand All @@ -29,12 +29,14 @@ void Establish()
async Task Because()
{
await ReactorObserver.WaitTillActive();
Observer.ShouldFail = true;
Observers[0].ShouldFail = true;
Observers[1].ShouldFail = false;
await EventStore.EventLog.Append(EventSourceId, Event);
await Tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
await Tcs[0].Task.WaitAsync(TimeSpan.FromSeconds(5));

FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId);
Jobs = await EventStore.WaitForThereToBeJobs();
await Tcs[1].Task.WaitAsync(TimeSpan.FromSeconds(5));
await EventStore.WaitForThereToBeNoJobs();

FailedPartitionsAfterRetry = await GetFailedPartitions();
Expand Down

0 comments on commit 25b5349

Please sign in to comment.