Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/error with restarting #1668

Merged
merged 31 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e0782ce
Make some Polly resilience event serverities Debug
woksin Jan 20, 2025
0a5ba70
Add integration test to prove that observer catchup does not work as …
woksin Jan 20, 2025
55c39b1
Fix TailSequenceNumberPerEventType to be correct value
woksin Jan 20, 2025
2fba337
Try do a temporary fix for some simple scenarios
woksin Jan 20, 2025
cb29ce7
Remove unneeded states from ObserverRunningState and remove NextEvent…
woksin Jan 21, 2025
f8f6e9b
Start on adding more integration tests
woksin Jan 21, 2025
e050179
Change Observer state
woksin Jan 21, 2025
e99d8a6
Remove Indexing state
woksin Jan 21, 2025
78e9f09
WIP fix state machine for Observer
woksin Jan 21, 2025
98d3376
Removing references to Indexing and CatchUp
einari Jan 21, 2025
a5b8de4
Removing CatchUp as an allowed transition
einari Jan 21, 2025
971e5fa
Removing explicit CatchUp state
einari Jan 21, 2025
bf04755
Downgrading Xunit
einari Jan 21, 2025
724a9f1
Up and running with the new simplified routing and state management
einari Jan 21, 2025
1572752
Removing traits
einari Jan 22, 2025
592eb6e
Removing specs for Catchup, since this state doesn't exist anymore
einari Jan 22, 2025
8ec8824
Updated states
einari Jan 22, 2025
930cb51
Fixing specs after refactoring
einari Jan 22, 2025
b105e67
Fixed compilation issue with spec
woksin Jan 22, 2025
9d0ba2b
Fixing path for Workbench build
einari Jan 22, 2025
a6b21be
Merge branch 'fix/error-with-restarting' of github.com:Cratis/Chronic…
einari Jan 22, 2025
2016905
Upgrading node version
einari Jan 22, 2025
f2b585e
Stop falling through state management
einari Jan 22, 2025
8dac053
Don't switch to running after failed for job
einari Jan 22, 2025
7a59a3f
Fixing comments
einari Jan 22, 2025
fe3c01b
Return from ReceiveReminder if observer is not subscribed, no point i…
einari Jan 22, 2025
6776f29
Merge remote-tracking branch 'origin/main' into fix/error-with-restar…
einari Jan 22, 2025
3b780ed
Making sure we are not processing events while waiting for catchup to…
einari Jan 22, 2025
28a1630
Adding OnBeforePrepareSteps() for Jobs to override which is called in…
einari Jan 22, 2025
d08444d
Making sure we don't handle events while preparing partitions for cat…
einari Jan 22, 2025
ef6b33c
Await the PrepareAllSteps()
einari Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 4 additions & 52 deletions .github/workflows/pull-requests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ jobs:
id: yarn-cache-dir-path
run: echo "::set-output name=dir::$(yarn cache dir)"

- name: Setup node v16
- name: Setup node
uses: actions/setup-node@v4
with:
node-version: 16.x
node-version: 20.x
registry-url: "https://registry.npmjs.org"

- uses: actions/cache@v3
Expand All @@ -99,16 +99,13 @@ jobs:
${{ runner.os }}-yarn-

- name: Yarn install
working-directory: ./Source/Workbench/Web
run: yarn

- name: Build JS/TS
run: |
export NODE_OPTIONS="--max-old-space-size=4096"
yarn build

- name: Build Workbench
working-directory: ./Source/Workbench/Web
run: |
export NODE_OPTIONS="--max-old-space-size=4096"
yarn build

publish-nuget-packages:
Expand Down Expand Up @@ -152,51 +149,6 @@ jobs:
https://github.com/cratis/chronicle/packages/1655206?version=${{ needs.release.outputs.version }}
allow-repeats: false

publish-npm-packages:
if: needs.release.outputs.publish == 'true'
runs-on: ubuntu-latest
needs: [release]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup node v16
uses: actions/setup-node@v4
with:
node-version: 16.x
registry-url: "https://registry.npmjs.org"


# - name: Get yarn cache directory path
# id: yarn-cache-dir-path
# run: echo "::set-output name=dir::$(yarn cache dir)"
# - uses: actions/cache@v3
# id: yarn-cache
# with:
# path: |
# **/node_modules
# **/.eslintcache
# **/yarn.lock
# ${{ steps.yarn-cache-dir-path.outputs.dir }}
# key: ${{ runner.os }}-yarn-${{ hashFiles('**/yarn.lock') }}
# restore-keys: |
# ${{ runner.os }}-yarn-

# - name: Yarn install
# run: yarn

# - name: Publish NPM packages
# env:
# NPM_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
# NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
# run: |
# yarn publish-version ${{ needs.release.outputs.version }}

- name: Git reset (package.json files changed)
run: |
git reset --hard

publish-docker:
if: needs.release.outputs.publish == 'true'
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<PackageVersion Include="Cratis.Specifications.XUnit" Version="3.0.4" />
<PackageVersion Include="Testcontainers" Version="4.1.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="Microsoft.NET.Test.SDK" Version="17.12.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Integration/Orleans.InProcess/ObserverHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static class ObserverHelpers
public static async Task WaitForState(this IObserver observer, ObserverRunningState runningState, TimeSpan? timeout = default)
{
timeout ??= TimeSpan.FromSeconds(5);
var currentRunningState = ObserverRunningState.New;
var currentRunningState = ObserverRunningState.Unknown;
using var cts = new CancellationTokenSource(timeout.Value);
while (currentRunningState != runningState && !cts.IsCancellationRequested)
{
Expand Down
2 changes: 1 addition & 1 deletion Integration/Orleans.InProcess/for_Reactors/SomeEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors;

[EventType]
public record SomeEvent(int Number);
public record SomeEvent(int Number);
8 changes: 8 additions & 0 deletions Integration/Orleans.InProcess/for_Reactors/SomeOtherEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors;

[EventType]
public record SomeOtherEvent(int Number);
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class a_disconnected_reactor_observing_an_event(GlobalFixture globalFixtu
public TaskCompletionSource Tcs;
public ReactorWithoutDelay Reactor;
public IObserver ReactorObserver;
public override IEnumerable<Type> EventTypes => [typeof(SomeEvent)];
public override IEnumerable<Type> EventTypes => [typeof(SomeEvent), typeof(SomeOtherEvent)];

protected override void ConfigureServices(IServiceCollection services)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ async Task Because()
[Fact]
void should_have_correct_observer_state_next_event_sequence_number() => Context.ReactorObserverState.NextEventSequenceNumber.Value.ShouldEqual(1ul);

[Fact]
void should_have_correct_observer_state_next_event_sequence_number_for_event_types() => Context.ReactorObserverState.NextEventSequenceNumberForEventTypes.Value.ShouldEqual(EventSequenceNumber.Unavailable.Value);

[Fact]
void should_not_have_failed_partitions() => Context.FailedPartitions.ShouldBeEmpty();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
using Cratis.Chronicle.EventSequences;
using Cratis.Chronicle.Integration.Base;
using Cratis.Chronicle.Storage.Observation;
using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.existing.with_multiple_partitions.and_reactor_has_observed_events_previously_and_is_not_behind.context;
using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState;

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.existing.with_multiple_partitions;

[Collection(GlobalCollection.Name)]
public class and_reactor_has_observed_events_previously_and_is_not_behind(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_disconnected_reactor_observing_an_event(globalFixture)
{
public List<EventForEventSourceId> EventsToHandle;
public List<EventForEventSourceId> NewEvents;
public EventSequenceNumber LastHandledEventSequenceNumber;

public ObserverState ReactorObserverState;

public EventSequenceNumber LastEventSequenceNumberAfterDisconnect;

async Task Establish()
{
var reactor = await EventStore.Reactors.Register<ReactorWithoutDelay>();
ReactorObserver = GetObserverForReactor<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();

EventsToHandle = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeEvent(42), 10).ToList();
var result = await EventStore.EventLog.AppendMany(EventsToHandle);
LastHandledEventSequenceNumber = result.SequenceNumbers.Last();

await ReactorObserver.WaitTillReachesEventSequenceNumber(LastHandledEventSequenceNumber);
reactor.Disconnect();

NewEvents = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeOtherEvent(42), 10).ToList();
result = await EventStore.EventLog.AppendMany(NewEvents);
LastEventSequenceNumberAfterDisconnect = result.SequenceNumbers.Last();
}

async Task Because()
{
await EventStore.Reactors.Register<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();
ReactorObserverState = await ReactorObserver.GetState();
}
}

[Fact]
void should_have_reactor_observer_be_in_running_state() => Context.ReactorObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);

[Fact]
void should_not_catch_up_any_new_events_added_while_disconnected() => Context.ReactorObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(Context.LastHandledEventSequenceNumber.Value);

[Fact]
void should_set_correct_next_event_sequence_number() => Context.ReactorObserverState.NextEventSequenceNumber.Value.ShouldEqual(Context.LastEventSequenceNumberAfterDisconnect.Next().Value);

[Fact]
void should_only_process_first_events() => Context.Reactor.HandledEvents.ShouldEqual(Context.EventsToHandle.Count);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
using Cratis.Chronicle.EventSequences;
using Cratis.Chronicle.Integration.Base;
using Cratis.Chronicle.Storage.Observation;
using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.existing.with_single_partition.and_reactor_has_observed_events_previously_and_is_not_behind.context;
using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState;

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.existing.with_single_partition;

[Collection(GlobalCollection.Name)]
public class and_reactor_has_observed_events_previously_and_is_not_behind(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_disconnected_reactor_observing_an_event(globalFixture)
{
public List<EventForEventSourceId> EventsToHandle;
public List<EventForEventSourceId> NewEvents;
public EventSequenceNumber LastHandledEventSequenceNumber;

public ObserverState ReactorObserverState;

public EventSequenceNumber LastEventSequenceNumberAfterDisconnect;

async Task Establish()
{
var reactor = await EventStore.Reactors.Register<ReactorWithoutDelay>();
ReactorObserver = GetObserverForReactor<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();

EventsToHandle = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeEvent(42), 10, "Partition").ToList();
var result = await EventStore.EventLog.AppendMany(EventsToHandle);
LastHandledEventSequenceNumber = result.SequenceNumbers.Last();

await ReactorObserver.WaitTillReachesEventSequenceNumber(LastHandledEventSequenceNumber);
reactor.Disconnect();

NewEvents = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeOtherEvent(42), 10, "Partition").ToList();
result = await EventStore.EventLog.AppendMany(NewEvents);
LastEventSequenceNumberAfterDisconnect = result.SequenceNumbers.Last();
}

async Task Because()
{
await EventStore.Reactors.Register<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();
ReactorObserverState = await ReactorObserver.GetState();
}
}

[Fact]
void should_have_reactor_observer_be_in_running_state() => Context.ReactorObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);

[Fact]
void should_not_catch_up_any_new_events_added_while_disconnected() => Context.ReactorObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(Context.LastHandledEventSequenceNumber.Value);

[Fact]
void should_set_correct_next_event_sequence_number() => Context.ReactorObserverState.NextEventSequenceNumber.Value.ShouldEqual(Context.LastEventSequenceNumberAfterDisconnect.Next().Value);

[Fact]
void should_only_process_first_events() => Context.Reactor.HandledEvents.ShouldEqual(Context.EventsToHandle.Count);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
using Cratis.Chronicle.Integration.Base;
using Cratis.Chronicle.Storage.Observation;
using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.non_existent.with_multiple_partition.and_reactor_is_registered_while_there_are_no_events_to_handle.context;
using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState;

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.non_existent.with_multiple_partition;

[Collection(GlobalCollection.Name)]
public class and_reactor_is_registered_while_there_are_no_events_to_handle(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_disconnected_reactor_observing_an_event(globalFixture)
{
public ObserverState ReactorObserverState;
public EventSequenceNumber LastEventSequenceNumber;

async Task Establish()
{
var events = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeOtherEvent(42), 10).ToList();
var result = await EventStore.EventLog.AppendMany(events);
LastEventSequenceNumber = result.SequenceNumbers.Last().Next();
}

async Task Because()
{
ReactorObserver = GetObserverForReactor<ReactorWithoutDelay>();
await EventStore.Reactors.Register<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();
ReactorObserverState = await ReactorObserver.GetState();
}
}

[Fact]
void should_have_reactor_observer_be_in_running_state() => Context.ReactorObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);

[Fact]
void should_not_catch_up_any_events() => Context.ReactorObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(Concepts.Events.EventSequenceNumber.Unavailable.Value);

[Fact]
void should_set_next_event_sequence_number_to_next_after_last_sequence_number_of_appended_events() => Context.ReactorObserverState.NextEventSequenceNumber.Value.ShouldEqual(Context.LastEventSequenceNumber.Value);

[Fact]
void should_process_no_events() => Context.Reactor.HandledEvents.ShouldEqual(0);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
using Cratis.Chronicle.Integration.Base;
using Cratis.Chronicle.Storage.Observation;
using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.non_existent.with_single_partition.and_reactor_is_registered_while_there_are_no_events_to_handle.context;
using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState;

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_connecting.non_existent.with_single_partition;

[Collection(GlobalCollection.Name)]
public class and_reactor_is_registered_while_there_are_no_events_to_handle(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_disconnected_reactor_observing_an_event(globalFixture)
{
public ObserverState ReactorObserverState;
public EventSequenceNumber LastEventSequenceNumber;

async Task Establish()
{
var events = EventForEventSourceIdHelpers.CreateMultiple(i => new SomeOtherEvent(42), 10, "Partition").ToList();
var result = await EventStore.EventLog.AppendMany(events);
LastEventSequenceNumber = result.SequenceNumbers.Last().Next();
}

async Task Because()
{
ReactorObserver = GetObserverForReactor<ReactorWithoutDelay>();
await EventStore.Reactors.Register<ReactorWithoutDelay>();
await ReactorObserver.WaitTillActive();
ReactorObserverState = await ReactorObserver.GetState();
}
}

[Fact]
void should_have_reactor_observer_be_in_running_state() => Context.ReactorObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);

[Fact]
void should_not_catch_up_any_events() => Context.ReactorObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(Concepts.Events.EventSequenceNumber.Unavailable.Value);

[Fact]
void should_set_next_event_sequence_number_to_first() => Context.ReactorObserverState.NextEventSequenceNumber.Value.ShouldEqual(Context.LastEventSequenceNumber.Value);

[Fact]
void should_process_no_events() => Context.Reactor.HandledEvents.ShouldEqual(0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_handling_event.and_it_fails;

[Collection(GlobalCollection.Name)]
[Trait("Category", "Output")]
public class and_needs_to_catch_up(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : given.a_reactor_observing_an_event_that_can_fail(globalFixture, 3)
Expand Down Expand Up @@ -51,13 +50,13 @@ async Task Because()
FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId);
Jobs = await EventStore.WaitForThereToBeJobs(waitTime);

// Wait for the second event to have been handled
// Wait for the first event to be handled a second time (retry)
await Tcs[1].Task.WaitAsync(waitTime);

// Append an event to be caught up
await EventStore.EventLog.Append(EventSourceId, Event);

// Wait for the third event to have been handled
// Wait for the second event to have been handled for the third time after retry
await Tcs[2].Task.WaitAsync(waitTime);
JobsWithCatchUp = await EventStore.WaitForThereToBeJobs(waitTime);
JobsAfterCompleted = await EventStore.WaitForThereToBeNoJobs(waitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ async Task Because()
[Fact]
void should_have_correct_observer_state_next_event_sequence_number() => Context.ReducerObserverState.NextEventSequenceNumber.Value.ShouldEqual(1ul);

[Fact]
void should_have_correct_observer_state_next_event_sequence_number_for_event_types() => Context.ReducerObserverState.NextEventSequenceNumberForEventTypes.Value.ShouldEqual(EventSequenceNumber.Unavailable.Value);

[Fact]
void should_not_have_failed_partitions() => Context.FailedPartitions.ShouldBeEmpty();
}
Loading
Loading