Skip to content

Commit

Permalink
Merge pull request #1693 from Cratis/fix/job-request-deiscriminator
Browse files Browse the repository at this point in the history
Fix/job request deiscriminator
  • Loading branch information
woksin authored Feb 10, 2025
2 parents 6b39b98 + c671c4c commit beb7ec1
Show file tree
Hide file tree
Showing 66 changed files with 715 additions and 225 deletions.
25 changes: 16 additions & 9 deletions Integration/Base/MongoDBDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,23 @@ public MongoDBDatabase(IContainer mongoDBContainer, string database)

foreach (var document in cursor.Current)
{
_changes.OnNext(document);
var changesCollection = changeDatabase.GetCollection<BsonDocument>(document.CollectionNamespace.CollectionName);
var changeDocument = new BsonDocument
try
{
{ "_id", ObjectId.GenerateNewId() },
{ "documentKey", document.DocumentKey },
{ "operationType", document.OperationType.ToString() },
{ "fullDocument", document.FullDocument }
};
await changesCollection.InsertOneAsync(changeDocument);
_changes.OnNext(document);
var changesCollection = changeDatabase.GetCollection<BsonDocument>(document.CollectionNamespace.CollectionName);
var changeDocument = new BsonDocument
{
{ "_id", ObjectId.GenerateNewId() },
{ "documentKey", (BsonValue)document.DocumentKey ?? "N/A" },
{ "operationType", document.OperationType.ToString() },
{ "fullDocument", document.FullDocument ?? new BsonDocument() }

Check warning on line 65 in Integration/Base/MongoDBDatabase.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0028)

Check warning on line 65 in Integration/Base/MongoDBDatabase.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0028)

Check warning on line 65 in Integration/Base/MongoDBDatabase.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0028)

Check warning on line 65 in Integration/Base/MongoDBDatabase.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0028)

Check warning on line 65 in Integration/Base/MongoDBDatabase.cs

View workflow job for this annotation

GitHub Actions / integration

Collection initialization can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0028)
};
await changesCollection.InsertOneAsync(changeDocument);
}
catch
{
// ignored
}
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions Integration/Orleans.InProcess/OrleansFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ protected override IHostBuilder CreateHostBuilder()
{
silo
.UseLocalhostClustering()
.AddCratisChronicle(_ => _.EventStoreName = Constants.EventStore)
.AddChronicleToSilo(_ => _
.WithMongoDB(chronicleOptions.Storage.ConnectionDetails, Constants.EventStore));
.AddCratisChronicle(
options => options.EventStoreName = Constants.EventStore,
chronicleBuilder => chronicleBuilder.WithMongoDB(chronicleOptions.Storage.ConnectionDetails, Constants.EventStore));
})
.UseConsoleLifetime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartitionJob));
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition));
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull();
[Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Type == nameof(CatchUpObserverPartition)).ShouldNotBeNull();
[Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(1ul);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async Task Because()
// Wait for the first event to have been handled
await Tcs[0].Task.WaitAsync(waitTime);

FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId);
FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId, waitTime);
Jobs = await EventStore.WaitForThereToBeJobs(waitTime);

// Wait for the second event to have been handled
Expand All @@ -56,7 +56,7 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob");
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition");
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob");
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition");
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartitionJob));
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition));
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull();
[Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Type == nameof(CatchUpObserverPartition)).ShouldNotBeNull();
[Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(1ul);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob");
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition");
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async Task Because()
}

[Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1);
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob");
[Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition");
[Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty();
[Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active);
[Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul);
Expand Down
9 changes: 9 additions & 0 deletions Source/Kernel/Concepts/Jobs/IJobRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Chronicle.Concepts.Jobs;

/// <summary>
/// Defines a job request.
/// </summary>
public interface IJobRequest;
58 changes: 58 additions & 0 deletions Source/Kernel/Concepts/Jobs/IJobTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using OneOf.Types;
namespace Cratis.Chronicle.Concepts.Jobs;

/// <summary>
/// Defines a system that knows about <see cref="JobType"/> job types and how to correlate it to CLR <see cref="Type"/>.
/// </summary>
public interface IJobTypes
{
/// <summary>
/// Error types for <see cref="IJobTypes.GetRequestClrTypeFor"/>.
/// </summary>
enum GetRequestClrTypeForError
{
JobTypeIsNotSet = 0,
CouldNotFindType = 1
}

/// <summary>
/// Error types for <see cref="IJobTypes.GetClrTypeFor"/>.
/// </summary>
enum GetClrTypeForError
{
JobTypeIsNotSet = 0,
CouldNotFindType = 1
}

/// <summary>
/// Error types for <see cref="IJobTypes.GetFor"/>.
/// </summary>
enum GetForError
{
NoAssociatedJobType = 0
}

/// <summary>
/// Gets the <see cref="JobType"/> associated with the CLR <see cref="Type"/> or <see cref="None"/>.
/// </summary>
/// <param name="type">The <see cref="Type"/>.</param>
/// <returns><see cref="Result{T0, T1}"/> <see cref="JobType"/> or <see cref="None"/>.</returns>
Result<JobType, GetForError> GetFor(Type type);

/// <summary>
/// Gets the job <see cref="Type"/> associated with the <see cref="JobType"/>.
/// </summary>
/// <param name="type">The <see cref="JobType"/>.</param>
/// <returns><see cref="Result{T0, T1}"/> <see cref="Type"/> or <see cref="GetClrTypeForError"/>.</returns>
Result<Type, GetClrTypeForError> GetClrTypeFor(JobType type);

/// <summary>
/// Gets the job request <see cref="Type"/> associated with the <see cref="JobType"/>.
/// </summary>
/// <param name="type">The <see cref="JobType"/>.</param>
/// <returns><see cref="Result{T0, T1}"/> <see cref="Type"/> or <see cref="GetRequestClrTypeForError"/>.</returns>
Result<Type, GetRequestClrTypeForError> GetRequestClrTypeFor(JobType type);
}
12 changes: 3 additions & 9 deletions Source/Kernel/Concepts/Jobs/JobType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace Cratis.Chronicle.Concepts.Jobs;

/// <summary>
/// Represents type of a job.
/// Represents type of a Job.
/// </summary>
/// <param name="Value">String representation of the job type.</param>
/// <remarks>
Expand All @@ -21,11 +21,5 @@ public record JobType(string Value) : ConceptAs<string>(Value)
/// Implicitly convert from <see cref="Type"/> to <see cref="JobType"/>.
/// </summary>
/// <param name="type"><see cref="Type"/> to convert from.</param>
public static implicit operator JobType(Type type) => new(type.AssemblyQualifiedName ?? type.Name);

/// <summary>
/// Implicitly convert from <see cref="JobType"/> to <see cref="Type"/>.
/// </summary>
/// <param name="type"><see cref="JobType"/> to convert from.</param>
public static implicit operator Type(JobType type) => Type.GetType(type.Value) ?? throw new UnknownClrTypeForJobType(type);
}
public static implicit operator JobType(Type type) => new(type.Name);
}
17 changes: 17 additions & 0 deletions Source/Kernel/Concepts/Jobs/JobTypeAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Chronicle.Concepts.Jobs;

/// <summary>
/// Represents the JobType attribute that can be placed on job type classes to specify the <see cref="JobType"/> name.
/// </summary>
/// <param name="jobType">The job type name to use.</param>
[AttributeUsage(AttributeTargets.Class)]
public sealed class JobTypeAttribute(string jobType) : Attribute
{
/// <summary>
/// Gets the <see cref="JobType"/>.
/// </summary>
public JobType JobType { get; } = new(jobType);
}
17 changes: 9 additions & 8 deletions Source/Kernel/Contracts/Jobs/Job.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Primitives;
using ProtoBuf;

namespace Cratis.Chronicle.Contracts.Jobs;
Expand All @@ -18,28 +19,28 @@ public class Job
public Guid Id { get; set; }

/// <summary>
/// Gets or sets the name of the job.
/// Gets or sets the details for a job.
/// </summary>
[ProtoMember(2)]
public string Name { get; set; }
public string Details { get; set; }

/// <summary>
/// Gets or sets the details for a job.
/// Gets or sets the type of the job.
/// </summary>
[ProtoMember(3)]
public string Details { get; set; }
public string Type { get; set; }

/// <summary>
/// Gets or sets the type of the job.
/// Gets or sets the status of the job.
/// </summary>
[ProtoMember(4)]
public string Type { get; set; }
public JobStatus Status { get; set; }

/// <summary>
/// Gets or sets the status of the job.
/// Gets or sets when job was created.
/// </summary>
[ProtoMember(5)]
public JobStatus Status { get; set; }
public SerializableDateTimeOffset Created { get; set; }

/// <summary>
/// Gets or sets collection of status changes that happened to the job.
Expand Down
4 changes: 2 additions & 2 deletions Source/Kernel/Grains.Interfaces/Jobs/IJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ public interface IJob : IGrainWithGuidCompoundKey
/// </summary>
/// <typeparam name="TRequest">Type of request object that gets passed to job.</typeparam>
public interface IJob<in TRequest> : IJob
where TRequest : class
where TRequest : class, IJobRequest
{
/// <summary>
/// Start the job.
/// </summary>
/// <param name="request">The request object for the job.</param>
/// <returns>Awaitable task.</returns>
Task<Result<JobError>> Start(TRequest request);
}
}
6 changes: 3 additions & 3 deletions Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface IJobsManager : IGrainWithIntegerCompoundKey
Task Rehydrate();

/// <summary>
/// Start a job.
/// Start a new job.
/// </summary>
/// <param name="jobId">The <see cref="JobId"/> uniquely identifying the job.</param>
/// <param name="request">The request parameter being passed to the job.</param>
Expand All @@ -28,7 +28,7 @@ public interface IJobsManager : IGrainWithIntegerCompoundKey
/// <returns>Awaitable task.</returns>
Task Start<TJob, TRequest>(JobId jobId, TRequest request)
where TJob : IJob<TRequest>
where TRequest : class;
where TRequest : class, IJobRequest;

/// <summary>
/// Resume a job.
Expand Down Expand Up @@ -59,7 +59,7 @@ Task Start<TJob, TRequest>(JobId jobId, TRequest request)
/// <returns>Collection of job states.</returns>
Task<IImmutableList<JobState>> GetJobsOfType<TJob, TRequest>()
where TJob : IJob<TRequest>
where TRequest : class;
where TRequest : class, IJobRequest;

/// <summary>
/// Get all jobs.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Concepts.Jobs;
using Cratis.Chronicle.Concepts.Observation;

namespace Cratis.Chronicle.Grains.Observation.Jobs;

/// <summary>
/// Defines the basis for a request to an observer.
/// </summary>
public interface IObserverJobRequest
public interface IObserverJobRequest : IJobRequest
{
/// <summary>
/// Gets the <see cref="ObserverKey"/> for the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ namespace Cratis.Chronicle.Grains.Observation.Jobs;
/// <summary>
/// Defines the job for retrying a failed partition for an observer.
/// </summary>
public interface IRetryFailedPartitionJob : IJob<RetryFailedPartitionRequest>;
public interface IRetryFailedPartition : IJob<RetryFailedPartitionRequest>;
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Cratis.Chronicle.Grains.Observation.Jobs;

/// <summary>
/// Represents the request for a <see cref="IRetryFailedPartitionJob"/>.
/// Represents the request for a <see cref="IRetryFailedPartition"/>.
/// </summary>
/// <param name="ObserverKey">The additional <see cref="ObserverKey"/> for the observer to replay.</param>
/// <param name="ObserverSubscription">The <see cref="ObserverSubscription"/> for the observer.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ async Task Because()
{
await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]);
await _queue.AwaitQueueDepletion();

// waiting for queue depletion does not guarantee that the event was actually handled
await Task.Delay(100);
}

[Fact] void should_call_handle_on_first_observer_twice() => _firstObserverHandledEventsPerPartition.Count.ShouldEqual(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ async Task Because()
{
await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]);
await _queue.AwaitQueueDepletion();

// waiting for queue depletion does not guarantee that the event was actually handled
await Task.Delay(100);
}

[Fact] void should_call_handle_on_first_observer_once() => _firstObserverHandledEventsPerPartition.Sum(_ => _.Value.Count).ShouldEqual(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ async Task Because()
{
await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]);
await _queue.AwaitQueueDepletion();

// waiting for queue depletion does not guarantee that the event was actually handled
await Task.Delay(100);
}

[Fact] void should_call_handle_on_observer_for_two_different_partitions() => _handledEventsPerPartition.Count.ShouldEqual(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ async Task Because()
{
await _queue.Enqueue([_appendedEvent]);
await _queue.AwaitQueueDepletion();

// waiting for queue depletion does not guarantee that the event was actually handled
await Task.Delay(100);
}

[Fact] void should_call_handle_on_observer_once() => _handledEventsPerPartition[_eventSourceId].Count.ShouldEqual(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ async Task Because()
{
await _queue.Enqueue([_appendedEvent]);
await _queue.AwaitQueueDepletion();

// waiting for queue depletion does not guarantee that the event was actually handled
await Task.Delay(100);
}

[Fact] void should_not_handle_any_events() => _handledEventsPerPartition.Count.ShouldEqual(0);
Expand Down
Loading

0 comments on commit beb7ec1

Please sign in to comment.