From 334392f99a9b6f2ecc966b8df9f4488ab1ca2974 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:22:24 +0100 Subject: [PATCH 01/38] Add Created timestamp to Job --- Source/Kernel/Contracts/Jobs/Job.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Contracts/Jobs/Job.cs b/Source/Kernel/Contracts/Jobs/Job.cs index 6237ed07b..17cdf7532 100644 --- a/Source/Kernel/Contracts/Jobs/Job.cs +++ b/Source/Kernel/Contracts/Jobs/Job.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Contracts.Primitives; using ProtoBuf; namespace Cratis.Chronicle.Contracts.Jobs; @@ -41,15 +42,21 @@ public class Job [ProtoMember(5)] public JobStatus Status { get; set; } + /// + /// Gets or sets when job was created. + /// + [ProtoMember(6)] + public SerializableDateTimeOffset Created { get; set; } + /// /// Gets or sets collection of status changes that happened to the job. /// - [ProtoMember(6, IsRequired = true)] + [ProtoMember(7, IsRequired = true)] public IList StatusChanges { get; set; } = []; /// /// Gets or sets the . /// - [ProtoMember(7, IsRequired = true)] + [ProtoMember(8, IsRequired = true)] public JobProgress Progress { get; set; } = new(); } From 48955c1768eebcead9745b7dc8580d9253dc3581 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:23:12 +0100 Subject: [PATCH 02/38] Add Created timestamp to JobState --- Source/Kernel/Storage/Jobs/JobState.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Source/Kernel/Storage/Jobs/JobState.cs b/Source/Kernel/Storage/Jobs/JobState.cs index 8a31f72c1..ad311b6fe 100644 --- a/Source/Kernel/Storage/Jobs/JobState.cs +++ b/Source/Kernel/Storage/Jobs/JobState.cs @@ -15,11 +15,6 @@ public class JobState /// public JobId Id { get; set; } = JobId.NotSet; - /// - /// Gets or sets the name of the job. - /// - public JobName Name { get; set; } = JobName.NotSet; - /// /// Gets or sets the details for a job. /// @@ -35,6 +30,11 @@ public class JobState /// public JobStatus Status { get; set; } + /// + /// Gets or sets the time that this job was created. + /// + public DateTimeOffset Created { get; set; } = DateTimeOffset.MinValue; + /// /// Gets or sets collection of status changes that happened to the job. /// @@ -48,7 +48,7 @@ public class JobState /// /// Gets or sets the request associated with the job. /// - public object Request { get; set; } = default!; + public IJobRequest Request { get; set; } = default!; /// /// Gets whether the job is resumable. From dc6d1208e39ad4b1dbda9dc6421b5a058adaa7cc Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:24:20 +0100 Subject: [PATCH 03/38] Add Created and remove Name from Job contract --- Source/Kernel/Services/Jobs/JobsConverters.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Kernel/Services/Jobs/JobsConverters.cs b/Source/Kernel/Services/Jobs/JobsConverters.cs index 8def0dd3c..3ed113bcc 100644 --- a/Source/Kernel/Services/Jobs/JobsConverters.cs +++ b/Source/Kernel/Services/Jobs/JobsConverters.cs @@ -20,10 +20,10 @@ public static Job ToContract(this JobState job) => new() { Id = job.Id, - Name = job.Name, Details = job.Details, Type = job.Type, Status = (JobStatus)(int)job.Status, + Created = job.Created!, StatusChanges = job.StatusChanges.ToContract(), Progress = job.Progress.ToContract() }; From 4543bcafb8d69f88dd86a98a3262b2fc1f5eab11 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:30:22 +0100 Subject: [PATCH 04/38] Create IJobRequest as a marker interface for job request types and use it --- Source/Kernel/Concepts/Jobs/IJobRequest.cs | 9 +++++++++ Source/Kernel/Grains.Interfaces/Jobs/IJob.cs | 4 ++-- .../Observation/Jobs/IObserverJobRequest.cs | 3 ++- Source/Kernel/Grains/Jobs/Job.cs | 6 ++---- 4 files changed, 15 insertions(+), 7 deletions(-) create mode 100644 Source/Kernel/Concepts/Jobs/IJobRequest.cs diff --git a/Source/Kernel/Concepts/Jobs/IJobRequest.cs b/Source/Kernel/Concepts/Jobs/IJobRequest.cs new file mode 100644 index 000000000..b58e43d68 --- /dev/null +++ b/Source/Kernel/Concepts/Jobs/IJobRequest.cs @@ -0,0 +1,9 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Concepts.Jobs; + +/// +/// Defines a job request. +/// +public interface IJobRequest; diff --git a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs index 7f7ddf514..381ab6ba2 100644 --- a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs +++ b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs @@ -107,7 +107,7 @@ public interface IJob : IGrainWithGuidCompoundKey /// /// Type of request object that gets passed to job. public interface IJob : IJob - where TRequest : class + where TRequest : class, IJobRequest { /// /// Start the job. @@ -115,4 +115,4 @@ public interface IJob : IJob /// The request object for the job. /// Awaitable task. Task> Start(TRequest request); -} +} \ No newline at end of file diff --git a/Source/Kernel/Grains.Interfaces/Observation/Jobs/IObserverJobRequest.cs b/Source/Kernel/Grains.Interfaces/Observation/Jobs/IObserverJobRequest.cs index f85c1d324..68fda0aaf 100644 --- a/Source/Kernel/Grains.Interfaces/Observation/Jobs/IObserverJobRequest.cs +++ b/Source/Kernel/Grains.Interfaces/Observation/Jobs/IObserverJobRequest.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Concepts.Observation; namespace Cratis.Chronicle.Grains.Observation.Jobs; @@ -8,7 +9,7 @@ namespace Cratis.Chronicle.Grains.Observation.Jobs; /// /// Defines the basis for a request to an observer. /// -public interface IObserverJobRequest +public interface IObserverJobRequest : IJobRequest { /// /// Gets the for the request. diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index 98c0214a6..22cfad1a9 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -22,7 +22,7 @@ namespace Cratis.Chronicle.Grains.Jobs; /// Type of state for the job. [StorageProvider(ProviderName = WellKnownGrainStorageProviders.Jobs)] public abstract class Job : Grain, IJob - where TRequest : class + where TRequest : class, IJobRequest where TJobState : JobState { Dictionary _jobStepGrains = []; @@ -94,9 +94,7 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) JobKey = keyExtension; ThisJob = GrainFactory.GetReference>(this); - - State.Name = GetType().Name; - State.Type = this.GetGrainType(); + State.Type = GetType(); Storage = ServiceProvider.GetRequiredService() .GetEventStore(JobKey.EventStore) .GetNamespace(JobKey.Namespace); From 77c340a07c9361d6dbcb2847d41b729700003bbf Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:31:49 +0100 Subject: [PATCH 05/38] Add IJobRequest to IJobsManager as well --- Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs b/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs index 7a13e7867..4df545f2b 100644 --- a/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs +++ b/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs @@ -28,7 +28,7 @@ public interface IJobsManager : IGrainWithIntegerCompoundKey /// Awaitable task. Task Start(JobId jobId, TRequest request) where TJob : IJob - where TRequest : class; + where TRequest : class, IJobRequest; /// /// Resume a job. @@ -59,7 +59,7 @@ Task Start(JobId jobId, TRequest request) /// Collection of job states. Task> GetJobsOfType() where TJob : IJob - where TRequest : class; + where TRequest : class, IJobRequest; /// /// Get all jobs. From 24a62fe537d782060d4c9b389cf328dde069f1a3 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 09:32:02 +0100 Subject: [PATCH 06/38] JobType is now just the Type name --- Source/Kernel/Concepts/Jobs/JobType.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Concepts/Jobs/JobType.cs b/Source/Kernel/Concepts/Jobs/JobType.cs index 028389ca9..9cb7d96b1 100644 --- a/Source/Kernel/Concepts/Jobs/JobType.cs +++ b/Source/Kernel/Concepts/Jobs/JobType.cs @@ -21,11 +21,11 @@ public record JobType(string Value) : ConceptAs(Value) /// Implicitly convert from to . /// /// to convert from. - public static implicit operator JobType(Type type) => new(type.AssemblyQualifiedName ?? type.Name); + public static implicit operator JobType(Type type) => new(type.Name); /// /// Implicitly convert from to . /// /// to convert from. public static implicit operator Type(JobType type) => Type.GetType(type.Value) ?? throw new UnknownClrTypeForJobType(type); -} +} \ No newline at end of file From a686fabeef7fb83bccfa5b2c5ef68107002e41a3 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 10:22:33 +0100 Subject: [PATCH 07/38] Introduce JobTypes that knows about the association between a Job and a JobType --- Source/Kernel/Concepts/Jobs/IJobTypes.cs | 58 ++++++++++ Source/Kernel/Concepts/Jobs/JobType.cs | 2 +- .../Kernel/Concepts/Jobs/JobTypeAttribute.cs | 17 +++ .../Grains/Jobs/JobTypeAlreadyExists.cs | 14 +++ .../Jobs/JobTypeCanOnlyHaveOneRequestType.cs | 10 ++ .../Jobs/JobTypeMustHaveARequestType.cs | 10 ++ Source/Kernel/Grains/Jobs/JobTypes.cs | 103 ++++++++++++++++++ 7 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 Source/Kernel/Concepts/Jobs/IJobTypes.cs create mode 100644 Source/Kernel/Concepts/Jobs/JobTypeAttribute.cs create mode 100644 Source/Kernel/Grains/Jobs/JobTypeAlreadyExists.cs create mode 100644 Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs create mode 100644 Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs create mode 100644 Source/Kernel/Grains/Jobs/JobTypes.cs diff --git a/Source/Kernel/Concepts/Jobs/IJobTypes.cs b/Source/Kernel/Concepts/Jobs/IJobTypes.cs new file mode 100644 index 000000000..1d2b44bf0 --- /dev/null +++ b/Source/Kernel/Concepts/Jobs/IJobTypes.cs @@ -0,0 +1,58 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using OneOf.Types; +namespace Cratis.Chronicle.Concepts.Jobs; + +/// +/// Defines a system that knows about job types and how to correlate it to CLR . +/// +public interface IJobTypes +{ + /// + /// Error types for . + /// + enum GetRequestClrTypeForError + { + JobTypeIsNotSet = 0, + CouldNotFindType = 1 + } + + /// + /// Error types for . + /// + enum GetClrTypeForError + { + JobTypeIsNotSet = 0, + CouldNotFindType = 1 + } + + /// + /// Error types for . + /// + enum GetForError + { + NoAssociatedJobType = 0 + } + + /// + /// Gets the associated with the CLR or . + /// + /// The . + /// or . + Result GetFor(Type type); + + /// + /// Gets the job associated with the . + /// + /// The . + /// or . + Result GetClrTypeFor(JobType type); + + /// + /// Gets the job request associated with the . + /// + /// The . + /// or . + Result GetRequestClrTypeFor(JobType type); +} diff --git a/Source/Kernel/Concepts/Jobs/JobType.cs b/Source/Kernel/Concepts/Jobs/JobType.cs index 9cb7d96b1..4927547d1 100644 --- a/Source/Kernel/Concepts/Jobs/JobType.cs +++ b/Source/Kernel/Concepts/Jobs/JobType.cs @@ -4,7 +4,7 @@ namespace Cratis.Chronicle.Concepts.Jobs; /// -/// Represents type of a job. +/// Represents type of a Job. /// /// String representation of the job type. /// diff --git a/Source/Kernel/Concepts/Jobs/JobTypeAttribute.cs b/Source/Kernel/Concepts/Jobs/JobTypeAttribute.cs new file mode 100644 index 000000000..29038003f --- /dev/null +++ b/Source/Kernel/Concepts/Jobs/JobTypeAttribute.cs @@ -0,0 +1,17 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Concepts.Jobs; + +/// +/// Represents the JobType attribute that can be placed on job type classes to specify the name. +/// +/// The job type name to use. +[AttributeUsage(AttributeTargets.Class)] +public sealed class JobTypeAttribute(string jobType) : Attribute +{ + /// + /// Gets the . + /// + public JobType JobType { get; } = new(jobType); +} diff --git a/Source/Kernel/Grains/Jobs/JobTypeAlreadyExists.cs b/Source/Kernel/Grains/Jobs/JobTypeAlreadyExists.cs new file mode 100644 index 000000000..76f9d841d --- /dev/null +++ b/Source/Kernel/Grains/Jobs/JobTypeAlreadyExists.cs @@ -0,0 +1,14 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Jobs; +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// Exception that gets thrown when a is associated with multiple clr types. +/// +/// The job type. +/// The already associated clr type. +/// The other clr type. +public class JobTypeAlreadyExists(JobType jobType, Type existingClrType, Type newClrType) + : Exception($"JobType {jobType} is already associated with {existingClrType} so it cannot be associated with {newClrType}"); \ No newline at end of file diff --git a/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs b/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs new file mode 100644 index 000000000..e042aa5e9 --- /dev/null +++ b/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs @@ -0,0 +1,10 @@ +using Cratis.Chronicle.Concepts.Jobs; +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// Exception that gets thrown when a job has multiple types. +/// +/// The job type. +/// The job clr . +public class JobTypeCanOnlyHaveOneRequestType(JobType jobType, Type jobClrType) + : Exception($"Job {jobType} associated with clr type {jobClrType} has multiple Job Request types"); \ No newline at end of file diff --git a/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs b/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs new file mode 100644 index 000000000..3453bb7f3 --- /dev/null +++ b/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs @@ -0,0 +1,10 @@ +using Cratis.Chronicle.Concepts.Jobs; +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// Exception that gets thrown when a job does not have any type. +/// +/// The job type. +/// The job clr . +public class JobTypeMustHaveARequestType(JobType jobType, Type jobClrType) + : Exception($"Job {jobType} associated with clr type {jobClrType} must have one Job Request type"); \ No newline at end of file diff --git a/Source/Kernel/Grains/Jobs/JobTypes.cs b/Source/Kernel/Grains/Jobs/JobTypes.cs new file mode 100644 index 000000000..bd62ae88f --- /dev/null +++ b/Source/Kernel/Grains/Jobs/JobTypes.cs @@ -0,0 +1,103 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Reflection; +using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.DependencyInjection; +using Cratis.Types; +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// Represents an implementation of . +/// +[Singleton] +public class JobTypes : IJobTypes +{ + readonly Dictionary _jobTypes = []; + readonly Dictionary _jobTypePerType = []; + readonly Dictionary _jobRequestTypes = []; + + /// + /// Initializes an instance of the class. + /// + /// The . + public JobTypes(ITypes types) => InitializeMap(types); + + /// + public Result GetFor(Type type) => + _jobTypePerType.TryGetValue(type, out var jobType) + ? jobType + : IJobTypes.GetForError.NoAssociatedJobType; + + /// + public Result GetClrTypeFor(JobType type) + { + if (type == JobType.NotSet) + { + return IJobTypes.GetClrTypeForError.JobTypeIsNotSet; + } + + return _jobTypes.TryGetValue(type, out var jobClrType) + ? jobClrType + : IJobTypes.GetClrTypeForError.CouldNotFindType; + } + + /// + public Result GetRequestClrTypeFor(JobType type) + { + if (type == JobType.NotSet) + { + return IJobTypes.GetRequestClrTypeForError.JobTypeIsNotSet; + } + + return _jobRequestTypes.TryGetValue(type, out var jobRequestClrType) + ? jobRequestClrType + : IJobTypes.GetRequestClrTypeForError.CouldNotFindType; + } + + void InitializeMap(ITypes types) + { + PopulateJobTypes(types); + PopulateJobRequestTypes(); + } + + void PopulateJobTypes(ITypes types) + { + foreach (var jobClrType in types.FindMultiple().Where(type => type is { IsClass: true, IsAbstract: false } && type != typeof(NullJob))) + { + var jobTypeAttribute = jobClrType.GetCustomAttribute(); + var jobType = jobTypeAttribute?.JobType ?? jobClrType; + if (jobType == JobType.NotSet) + { + throw new ArgumentException($"JobType for type {jobClrType} is not set"); + } + if (!_jobTypes.TryAdd(jobType, jobClrType)) + { + throw new JobTypeAlreadyExists(jobType, _jobTypes[jobType], jobClrType); + } + _jobTypePerType.Add(jobClrType, jobType); + } + } + void PopulateJobRequestTypes() + { + foreach (var (jobType, jobClrType) in _jobTypes) + { + var jobInterfaces = jobClrType.GetInterfaces() + .Where(interfaceType => interfaceType.IsGenericType && interfaceType.GetGenericTypeDefinition() == typeof(IJob<>)) + .ToList(); + + switch (jobInterfaces.Count) + { + case > 1: + throw new JobTypeCanOnlyHaveOneRequestType(jobType, jobClrType); + case 0: + throw new JobTypeMustHaveARequestType(jobType, jobClrType); + default: + // First generic argument of IJob is the type of the request + _jobRequestTypes.Add(jobType, jobInterfaces[0].GetGenericArguments()[0]); + break; + } + } + } +} From 07aa75fbc502544e915d96412aadb0ce7f278a93 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 10:28:58 +0100 Subject: [PATCH 08/38] Use JobTypes in Storage.MongoDB JobStorage GetJobs --- Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs index 4053ce077..356963327 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs @@ -20,7 +20,8 @@ namespace Cratis.Chronicle.Storage.MongoDB.Jobs; /// Initializes a new instance of the class. /// /// for persistence. -public class JobStorage(IEventStoreNamespaceDatabase database) : IJobStorage +/// that knows about . +public class JobStorage(IEventStoreNamespaceDatabase database, IJobTypes jobTypes) : IJobStorage { IMongoCollection Collection => database.GetCollection(WellKnownCollectionNames.Jobs); @@ -140,7 +141,7 @@ public async Task, JobError>> GetJobs.Filter.Eq(_ => _.Type, jobType); var statusFilters = statuses.Select(status => Builders.Filter.Eq(_ => _.Status, status)); From c52ccfbaa5cab026adda8a606e17286626c169d5 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 14:01:53 +0100 Subject: [PATCH 09/38] Remove implicit converter for JobType to Type --- Source/Kernel/Concepts/Jobs/JobType.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Source/Kernel/Concepts/Jobs/JobType.cs b/Source/Kernel/Concepts/Jobs/JobType.cs index 4927547d1..62c7c1b1a 100644 --- a/Source/Kernel/Concepts/Jobs/JobType.cs +++ b/Source/Kernel/Concepts/Jobs/JobType.cs @@ -22,10 +22,4 @@ public record JobType(string Value) : ConceptAs(Value) /// /// to convert from. public static implicit operator JobType(Type type) => new(type.Name); - - /// - /// Implicitly convert from to . - /// - /// to convert from. - public static implicit operator Type(JobType type) => Type.GetType(type.Value) ?? throw new UnknownClrTypeForJobType(type); } \ No newline at end of file From cf67d5fb0d439b46a1e926e25d104112379d9f35 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 15:00:14 +0100 Subject: [PATCH 10/38] Use JobTypes correctly in JobsManager --- .../Jobs/JobTypeNotAssociatedWithType.cs | 12 ++ Source/Kernel/Grains/Jobs/JobsManager.cs | 129 +++++++++++++----- .../Kernel/Grains/Jobs/JobsManagerLogging.cs | 12 ++ Source/Kernel/Storage/Jobs/JobError.cs | 8 +- 4 files changed, 126 insertions(+), 35 deletions(-) create mode 100644 Source/Kernel/Grains/Jobs/JobTypeNotAssociatedWithType.cs diff --git a/Source/Kernel/Grains/Jobs/JobTypeNotAssociatedWithType.cs b/Source/Kernel/Grains/Jobs/JobTypeNotAssociatedWithType.cs new file mode 100644 index 000000000..2cd5f6fbb --- /dev/null +++ b/Source/Kernel/Grains/Jobs/JobTypeNotAssociatedWithType.cs @@ -0,0 +1,12 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Jobs; +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// Exception that gets thrown when a is not associated with the given . +/// +/// The type. +public class JobTypeNotAssociatedWithType(Type type) + : Exception($"There is no JobType associated with {type}"); diff --git a/Source/Kernel/Grains/Jobs/JobsManager.cs b/Source/Kernel/Grains/Jobs/JobsManager.cs index e45685b27..d7f339cc2 100644 --- a/Source/Kernel/Grains/Jobs/JobsManager.cs +++ b/Source/Kernel/Grains/Jobs/JobsManager.cs @@ -7,6 +7,7 @@ using Cratis.Chronicle.Storage; using Cratis.Chronicle.Storage.Jobs; using Microsoft.Extensions.Logging; +using OneOf; using OneOf.Types; namespace Cratis.Chronicle.Grains.Jobs; @@ -18,9 +19,11 @@ namespace Cratis.Chronicle.Grains.Jobs; /// Initializes a new instance of the class. /// /// for working with underlying storage. +/// that knows about job type associations. /// Logger for logging. public class JobsManager( IStorage storage, + IJobTypes jobTypes, ILogger logger) : Grain, IJobsManager { IEventStoreNamespaceStorage? _namespaceStorage; @@ -53,16 +56,15 @@ public async Task Rehydrate() Task RehydrateJobs(IEnumerable runningJobs) { - var tasks = runningJobs.Select(_ => (_.Id, GetJobGrain(_))).Select(async idAndJob => + var tasks = runningJobs.Select(_ => _.Id).Select(async jobId => { - var (id, job) = idAndJob; try { - await job.Resume(); + await Resume(jobId); } catch (Exception ex) { - logger.ErrorResumingJob(ex, id); + logger.ErrorResumingJob(ex, jobId); } }); return Task.WhenAll(tasks); @@ -72,7 +74,7 @@ Task RehydrateJobs(IEnumerable runningJobs) /// public async Task Start(JobId jobId, TRequest request) where TJob : IJob - where TRequest : class + where TRequest : class, IJobRequest { using var scope = logger.BeginJobsManagerScope(_key); @@ -95,14 +97,52 @@ public async Task Resume(JobId jobId) logger.ResumingJob(jobId); var jobStateResult = await _jobStorage!.GetJob(jobId); - await jobStateResult.Match(ResumeJob, error => HandleError(jobId, error), error => HandleUnknownFailure(jobId, error)); + await jobStateResult.Match( + async jobState => + { + var resumeJobResult = await ResumeJob(jobState); + await HandleResumeJobResult(resumeJobResult); + }, + error => HandleError(jobId, error), + error => HandleUnknownFailure(jobId, error)); return; - Task ResumeJob(JobState jobState) + async Task HandleResumeJobResult(OneOf, Storage.Jobs.JobError> resumeJobResult) { - var job = GetJobGrain(jobState); - return job.Resume(); + await resumeJobResult.Match( + resumeResult => resumeResult.Match( + success => + { + switch (success) + { + case ResumeJobSuccess.JobAlreadyRunning: + logger.CannotResumeJobBecauseAlreadyRunning(jobId); + break; + case ResumeJobSuccess.JobCannotBeResumed: + logger.CannotResumeJob(jobId); + break; + } + return Task.CompletedTask; + }, + resumeError => resumeError.Match( + jobError => HandleError(jobId, jobError), + failedResumingSteps => + { + logger.FailedToResumeJobSteps(jobId, failedResumingSteps.FailedJobSteps); + return Task.CompletedTask; + })), + jobError => HandleError(jobId, jobError)); } + Task, Storage.Jobs.JobError>> ResumeJob(JobState jobState) => GetJobGrain(jobState) + .Match, Storage.Jobs.JobError>>>( + async job => + { + var result = await job.Resume(); + return result.Match, Storage.Jobs.JobError>>( + success => Result.Success(success), + error => Result.Failed(error)); + }, + _ => Task.FromResult, Storage.Jobs.JobError>>(Storage.Jobs.JobError.TypeIsNotAssociatedWithAJobType)); } /// @@ -111,7 +151,11 @@ public async Task Stop(JobId jobId) using var scope = logger.BeginJobsManagerScope(_key); var stopJobResult = await TryStopJob(jobId); - await stopJobResult.Match(_ => Task.CompletedTask, error => HandleError(jobId, error), error => HandleUnknownFailure(jobId, error)); + await stopJobResult.Match( + _ => Task.CompletedTask, + jobError => HandleError(jobId, jobError), + jobStorageError => HandleError(jobId, jobStorageError), + error => HandleUnknownFailure(jobId, error)); } /// @@ -124,9 +168,14 @@ public async Task Delete(JobId jobId) var stopJobResult = await TryStopJob(jobId); var stoppedJob = await stopJobResult.Match( _ => Task.FromResult(true), - async error => + async jobError => { - await HandleError(jobId, error); + await HandleError(jobId, jobError); + return false; + }, + async jobStorageError => + { + await HandleError(jobId, jobStorageError); return false; }, async error => @@ -146,7 +195,7 @@ public async Task Delete(JobId jobId) /// public async Task> GetJobsOfType() where TJob : IJob - where TRequest : class + where TRequest : class, IJobRequest { var getJobs = await _namespaceStorage!.Jobs.GetJobs(); return await getJobs.Match( @@ -176,12 +225,12 @@ public async Task> GetAllJobs() }); } - IJob GetJobGrain(JobState jobState) => (GrainFactory.GetGrain( - jobState.Type, - jobState.Id, - new JobKey(_key.EventStore, _key.Namespace)) as IJob)!; + Result GetJobGrain(JobState jobState) => jobTypes.GetClrTypeFor(jobState.Type) + .Match>( + jobType => Result.Success((IJob)GrainFactory.GetGrain(jobType, jobState.Id, new JobKey(_key.EventStore, _key.Namespace))), + error => error)!; - async Task> TryStopJob(JobId jobId) + async Task> TryStopJob(JobId jobId) { logger.StoppingJob(jobId); @@ -189,24 +238,25 @@ IJob GetJobGrain(JobState jobState) => (GrainFactory.GetGrain( return await jobStateResult.Match( async jobState => { - try - { - await StopJob(jobState); - return Catch.Success(default); - } - catch (Exception ex) - { - return ex; - } + var stopJobResult = await StopJob(jobState); + return stopJobResult.Match>( + none => none, + jobError => jobError, + jobStorageError => jobStorageError); }, - error => Task.FromResult>(error), - error => Task.FromResult>(error)); + error => Task.FromResult>(error), + ex => Task.FromResult>(ex)); - Task StopJob(JobState jobState) - { - var job = GetJobGrain(jobState); - return job.Stop(); - } + Task> StopJob(JobState jobState) => GetJobGrain(jobState) + .Match>>( + async job => + { + var result = await job.Stop(); + return result.Match>( + none => none, + error => error); + }, + _ => Task.FromResult>(Storage.Jobs.JobError.TypeIsNotAssociatedWithAJobType)); } async Task HandleCatch(Task doCatch, JobId jobId) @@ -231,6 +281,17 @@ Task HandleError(JobId jobId, Storage.Jobs.JobError jobError) return Task.CompletedTask; } + Task HandleError(JobId jobId, JobError jobError) + { + switch (jobError) + { + default: + logger.JobErrorOccurred(jobId, jobError); + break; + } + return Task.CompletedTask; + } + Task HandleUnknownFailure(JobId jobId, Exception ex) { logger.UnknownError(ex, jobId); diff --git a/Source/Kernel/Grains/Jobs/JobsManagerLogging.cs b/Source/Kernel/Grains/Jobs/JobsManagerLogging.cs index 9634734de..4c47b10f6 100644 --- a/Source/Kernel/Grains/Jobs/JobsManagerLogging.cs +++ b/Source/Kernel/Grains/Jobs/JobsManagerLogging.cs @@ -42,6 +42,18 @@ internal static partial class JobsManagerLogMessages [LoggerMessage(LogLevel.Warning, "Job {JobId} could not be found")] internal static partial void JobCouldNotBeFound(this ILogger logger, JobId jobId); + [LoggerMessage(LogLevel.Warning, "Job {JobId} an error occurred while performing action. {JobError}")] + internal static partial void JobErrorOccurred(this ILogger logger, JobId jobId, JobError jobError); + + [LoggerMessage(LogLevel.Warning, "Job {JobId} an error occurred while resuming job steps. {JobSteps}")] + internal static partial void FailedToResumeJobSteps(this ILogger logger, JobId jobId, IEnumerable jobSteps); + + [LoggerMessage(LogLevel.Warning, "Job {JobId} cannot be resumed because it is running")] + internal static partial void CannotResumeJobBecauseAlreadyRunning(this ILogger logger, JobId jobId); + + [LoggerMessage(LogLevel.Warning, "Job {JobId} cannot be resumed")] + internal static partial void CannotResumeJob(this ILogger logger, JobId jobId); + [LoggerMessage(LogLevel.Warning, "Job {JobId} encountered error : {Error}")] internal static partial void JobErrorOccurred(this ILogger logger, JobId jobId, Storage.Jobs.JobError error); diff --git a/Source/Kernel/Storage/Jobs/JobError.cs b/Source/Kernel/Storage/Jobs/JobError.cs index 963c41853..3f55f3423 100644 --- a/Source/Kernel/Storage/Jobs/JobError.cs +++ b/Source/Kernel/Storage/Jobs/JobError.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts.Jobs; namespace Cratis.Chronicle.Storage.Jobs; /// @@ -18,5 +19,10 @@ public enum JobError /// /// The of the job is derived from . /// - TypeIsNotAJobStateType = 2 + TypeIsNotAJobStateType = 2, + + /// + /// The of the job does not have an associated . + /// + TypeIsNotAssociatedWithAJobType = 3, } \ No newline at end of file From 32e1c34c5f50bed1d169ffbe54534e2a23da0fcb Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 15:00:53 +0100 Subject: [PATCH 11/38] Use JobTypes in JobStorage --- Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs index 356963327..973fb1448 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs @@ -141,10 +141,13 @@ public async Task, JobError>> GetJobs.Filter.Eq(_ => _.Type, jobType); var statusFilters = statuses.Select(status => Builders.Filter.Eq(_ => _.Status, status)); - var filter = statuses.Length == 0 ? jobTypeFilter : Builders.Filter.And(jobTypeFilter, Builders.Filter.Or(statusFilters)); From 93607919dd49700ad134cf9263441478ae5f8e58 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 3 Feb 2025 15:02:02 +0100 Subject: [PATCH 12/38] Use JobTypes in Job --- Source/Kernel/Grains/Jobs/Job.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index 22cfad1a9..e9bcbb9dc 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -85,16 +85,20 @@ enum HandleCompletionSuccess public override Task OnActivateAsync(CancellationToken cancellationToken) { _logger = ServiceProvider.GetService>>() ?? new NullLogger>(); - _observers = new( TimeSpan.FromMinutes(1), ServiceProvider.GetService>>() ?? new NullLogger>()); JobId = this.GetPrimaryKey(out var keyExtension); JobKey = keyExtension; - ThisJob = GrainFactory.GetReference>(this); - State.Type = GetType(); + State.Type = ServiceProvider.GetRequiredService().GetFor(GetType()).Match( + jobType => jobType, + error => error switch + { + IJobTypes.GetForError.NoAssociatedJobType => throw new JobTypeNotAssociatedWithType(GetType()), + _ => throw new ArgumentOutOfRangeException(nameof(error), error, null) + }); Storage = ServiceProvider.GetRequiredService() .GetEventStore(JobKey.EventStore) .GetNamespace(JobKey.Namespace); From bbc8865a183fc243c8505051ca353f734cfaf428 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 09:35:00 +0100 Subject: [PATCH 13/38] Add some TODOs --- Source/Kernel/Storage/Jobs/JobStepState.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Source/Kernel/Storage/Jobs/JobStepState.cs b/Source/Kernel/Storage/Jobs/JobStepState.cs index b6f7551a2..49c15acab 100644 --- a/Source/Kernel/Storage/Jobs/JobStepState.cs +++ b/Source/Kernel/Storage/Jobs/JobStepState.cs @@ -18,6 +18,7 @@ public class JobStepState /// /// Gets or sets the . /// + // TODO: We probably have to do the same thing here as for JobType public JobStepType Type { get; set; } = JobStepType.NotSet; /// @@ -43,5 +44,6 @@ public class JobStepState /// /// Gets or sets the request associated with the job step. /// + // TODO: We probably have to remove this. It's at least not necessary public object Request { get; set; } = null!; } From 47b66718fa9426b70dae46e79a6f2e6b8f4477cf Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 09:35:31 +0100 Subject: [PATCH 14/38] Create serializer --- .../Jobs/JobStateSerializer.cs | 35 +++++++++++++++++++ .../Kernel/Storage.MongoDB/Jobs/JobStorage.cs | 2 +- 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs new file mode 100644 index 000000000..3a5199890 --- /dev/null +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs @@ -0,0 +1,35 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Storage.Jobs; +using Cratis.Strings; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.Serializers; +namespace Cratis.Chronicle.Storage.MongoDB.Jobs; + +/// +/// Represents a for . +/// +/// The . +public class JobStateSerializer(IJobTypes jobTypes) : SerializerBase +{ + /// + public override JobState Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) + { + var rawBsonDocument = context.Reader.ReadRawBsonDocument(); + using var rawDocument = new RawBsonDocument(rawBsonDocument); + var bsonDocument = rawDocument.ToBsonDocument(); + var jobTypeString = bsonDocument.GetValue(nameof(JobState.Type).ToCamelCase()).AsString; + var jobRequestType = jobTypes.GetRequestClrTypeFor(new(jobTypeString)).AsT0; + var jobRequestElementName = nameof(JobState.Request).ToCamelCase(); + var request = (IJobRequest)BsonSerializer.Deserialize( + bsonDocument.GetElement(jobRequestElementName).ToBsonDocument(), + jobRequestType); + bsonDocument.Remove(jobRequestElementName); + var jobState = BsonSerializer.Deserialize(bsonDocument); + jobState.Request = request; + return jobState; + } +} \ No newline at end of file diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs index 973fb1448..fdf528a36 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStorage.cs @@ -196,4 +196,4 @@ async Task> GetJobsRaw(params JobStatus[] statuses) var aggregation = Collection.Aggregate().Match(filter); return await aggregation.ToListAsync().ConfigureAwait(false); } -} +} \ No newline at end of file From d0a94177d114ebc35e04bf1ddf5eb376d2d8f240 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 10:47:23 +0100 Subject: [PATCH 15/38] Fix Job logging --- Source/Kernel/Grains/Jobs/Job.cs | 2 +- Source/Kernel/Grains/Jobs/JobLogging.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index e9bcbb9dc..e77d475f6 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -675,7 +675,7 @@ async Task> Complete() } catch (Exception e) { - _logger.FailedOnCompleted(e, State.Name); + _logger.FailedOnCompleted(e, State.Id, State.Type); return Result.Failed(JobError.UnknownError); } } diff --git a/Source/Kernel/Grains/Jobs/JobLogging.cs b/Source/Kernel/Grains/Jobs/JobLogging.cs index d8667fa27..2e68a0de9 100644 --- a/Source/Kernel/Grains/Jobs/JobLogging.cs +++ b/Source/Kernel/Grains/Jobs/JobLogging.cs @@ -78,8 +78,8 @@ internal static partial class JobLogMessages [LoggerMessage(LogLevel.Warning, "Job failed on completed")] internal static partial void FailedOnCompleted(this ILogger logger, Exception error); - [LoggerMessage(LogLevel.Warning, "Job {JobName} failed unexpectedly on OnCompleted")] - internal static partial void FailedOnCompleted(this ILogger logger, Exception ex, string jobName); + [LoggerMessage(LogLevel.Warning, "Job {JobId} {JobType} failed unexpectedly on OnCompleted")] + internal static partial void FailedOnCompleted(this ILogger logger, Exception ex, JobId jobId, JobType jobType); [LoggerMessage(LogLevel.Warning, "Job failed on completed while there are no job steps to run, but Job will still clear state. Error: {JobError}")] internal static partial void FailedOnCompletedWhileNoJobSteps(this ILogger logger, JobError jobError); From a96c95a24d12b03fcb2f7160d7e69017e4d7ec52 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 10:49:04 +0100 Subject: [PATCH 16/38] Fix build issues and create a hosted service that discovers and registers bson serializers --- .../CustomSerializersRegistrationService.cs | 31 +++++++++++++++++++ .../EventStoreNamespaceStorage.cs | 4 ++- .../Storage.MongoDB/EventStoreStorage.cs | 4 +++ .../MongoDBChronicleBuilderExtensions.cs | 16 ++-------- Source/Kernel/Storage.MongoDB/Storage.cs | 4 +++ 5 files changed, 44 insertions(+), 15 deletions(-) create mode 100644 Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs diff --git a/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs new file mode 100644 index 000000000..87978dab6 --- /dev/null +++ b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs @@ -0,0 +1,31 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Types; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using MongoDB.Bson.Serialization; +namespace Cratis.Chronicle.Storage.MongoDB; + +/// +/// Represents an that registers our custom bson serializers. +/// +/// The . +/// The . +public class CustomSerializersRegistrationService(IServiceProvider serviceProvider, ITypes types) : IHostedService +{ + /// + public Task StartAsync(CancellationToken cancellationToken) + { + foreach (var type in types.FindMultiple() + .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType)) + { + var serializer = (IBsonSerializer)ActivatorUtilities.CreateInstance(serviceProvider, type); + BsonSerializer.TryRegisterSerializer(serializer.ValueType, serializer); + } + return Task.CompletedTask; + } + + /// + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} diff --git a/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs b/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs index 16bfba071..dd8403be9 100644 --- a/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs +++ b/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs @@ -57,6 +57,7 @@ public class EventStoreNamespaceStorage : IEventStoreNamespaceStorage /// for converting between expando object and json objects. /// The global . /// for getting all instances. + /// . /// for creating loggers. public EventStoreNamespaceStorage( EventStoreName eventStore, @@ -67,6 +68,7 @@ public EventStoreNamespaceStorage( Json.IExpandoObjectConverter expandoObjectConverter, JsonSerializerOptions jsonSerializerOptions, IInstancesOf sinkFactories, + IJobTypes jobTypes, ILoggerFactory loggerFactory) { _eventStore = eventStore; @@ -78,7 +80,7 @@ public EventStoreNamespaceStorage( _loggerFactory = loggerFactory; Changesets = new ChangesetStorage(eventStoreNamespaceDatabase); Identities = new IdentityStorage(eventStoreNamespaceDatabase, loggerFactory.CreateLogger()); - Jobs = new JobStorage(eventStoreNamespaceDatabase); + Jobs = new JobStorage(eventStoreNamespaceDatabase, jobTypes); JobSteps = new JobStepStorage(eventStoreNamespaceDatabase); Observers = new ObserverStorage(eventStoreNamespaceDatabase); FailedPartitions = new FailedPartitionStorage(eventStoreNamespaceDatabase); diff --git a/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs b/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs index 96f11357f..4cdc805fd 100644 --- a/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs +++ b/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs @@ -10,6 +10,7 @@ using Cratis.Chronicle.Concepts.Projections.Json; using Cratis.Chronicle.Storage.Events.Constraints; using Cratis.Chronicle.Storage.EventTypes; +using Cratis.Chronicle.Storage.Jobs; using Cratis.Chronicle.Storage.MongoDB.Events.Constraints; using Cratis.Chronicle.Storage.MongoDB.Namespaces; using Cratis.Chronicle.Storage.MongoDB.Observation.Reactors; @@ -41,6 +42,7 @@ namespace Cratis.Chronicle.Storage.MongoDB; /// for conversions. /// The global . /// for getting all instances. +/// . /// for creating loggers. public class EventStoreStorage( EventStoreName eventStore, @@ -52,6 +54,7 @@ public class EventStoreStorage( Json.IExpandoObjectConverter expandoObjectConverter, JsonSerializerOptions jsonSerializerOptions, IInstancesOf sinkFactories, + IJobTypes jobTypes, ILoggerFactory loggerFactory) : IEventStoreStorage { readonly ConcurrentDictionary _namespaces = new(); @@ -95,6 +98,7 @@ public IEventStoreNamespaceStorage GetNamespace(EventStoreNamespaceName @namespa expandoObjectConverter, jsonSerializerOptions, sinkFactories, + jobTypes, loggerFactory); } } diff --git a/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs b/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs index 49096a97c..c457fb24d 100644 --- a/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs +++ b/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs @@ -5,10 +5,8 @@ using Cratis.Chronicle.Storage; using Cratis.Chronicle.Storage.Compliance; using Cratis.Chronicle.Storage.MongoDB; -using Cratis.Chronicle.Storage.MongoDB.Events.Constraints; using Cratis.Compliance.MongoDB; using Microsoft.Extensions.DependencyInjection; -using MongoDB.Bson.Serialization; namespace Cratis.Chronicle.Setup; @@ -17,8 +15,6 @@ namespace Cratis.Chronicle.Setup; /// public static class MongoDBChronicleBuilderExtensions { - static bool _mongoDBArtifactsInitialized; - /// /// Configure Chronicle to use MongoDB, based on the . /// @@ -55,15 +51,7 @@ public static IChronicleBuilder WithMongoDB(this IChronicleBuilder builder, stri services.AddSingleton(); }); - if (!_mongoDBArtifactsInitialized) - { - BsonSerializer.TryRegisterSerializer(new JsonElementSerializer()); - BsonSerializer.TryRegisterSerializer(new UriSerializer()); - BsonSerializer.TryRegisterSerializer(new ConstraintDefinitionSerializer()); - BsonSerializer.TryRegisterSerializer(new SiloAddressSerializer()); - _mongoDBArtifactsInitialized = true; - } - + builder.Services.AddHostedService(); return builder; } -} +} \ No newline at end of file diff --git a/Source/Kernel/Storage.MongoDB/Storage.cs b/Source/Kernel/Storage.MongoDB/Storage.cs index c63769e2e..bd97918d2 100644 --- a/Source/Kernel/Storage.MongoDB/Storage.cs +++ b/Source/Kernel/Storage.MongoDB/Storage.cs @@ -10,6 +10,7 @@ using Cratis.Chronicle.Concepts.Observation.Reducers.Json; using Cratis.Chronicle.Concepts.Projections.Json; using Cratis.Chronicle.Reactive; +using Cratis.Chronicle.Storage.Jobs; using Cratis.Chronicle.Storage.Sinks; using Cratis.Types; using Microsoft.Extensions.Logging; @@ -31,6 +32,7 @@ namespace Cratis.Chronicle.Storage.MongoDB; /// for conversions. /// The global . /// for getting all instances. +/// . /// for creating loggers. public class Storage( IDatabase database, @@ -41,6 +43,7 @@ public class Storage( Json.IExpandoObjectConverter expandoObjectConverter, JsonSerializerOptions jsonSerializerOptions, IInstancesOf sinkFactories, + IJobTypes jobTypes, ILoggerFactory loggerFactory) : IStorage { readonly ConcurrentDictionary _eventStores = []; @@ -83,6 +86,7 @@ public IEventStoreStorage GetEventStore(EventStoreName eventStore) expandoObjectConverter, jsonSerializerOptions, sinkFactories, + jobTypes, loggerFactory); return _eventStores[eventStore] = eventStoreStorage; From 5333e2b3bc6aef119114893ce3b5ba76f9d65e4d Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 13:16:38 +0100 Subject: [PATCH 17/38] Make it build --- Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs | 1 + Source/Kernel/Storage.MongoDB/EventStoreStorage.cs | 2 +- Source/Kernel/Storage.MongoDB/Storage.cs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs b/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs index dd8403be9..f48e0cf95 100644 --- a/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs +++ b/Source/Kernel/Storage.MongoDB/EventStoreNamespaceStorage.cs @@ -6,6 +6,7 @@ using Cratis.Chronicle.Compliance; using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.EventSequences; +using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Storage.Changes; using Cratis.Chronicle.Storage.Events.Constraints; using Cratis.Chronicle.Storage.EventSequences; diff --git a/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs b/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs index 4cdc805fd..b3f099e2c 100644 --- a/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs +++ b/Source/Kernel/Storage.MongoDB/EventStoreStorage.cs @@ -5,12 +5,12 @@ using System.Text.Json; using Cratis.Chronicle.Compliance; using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Concepts.Observation.Reactors.Json; using Cratis.Chronicle.Concepts.Observation.Reducers.Json; using Cratis.Chronicle.Concepts.Projections.Json; using Cratis.Chronicle.Storage.Events.Constraints; using Cratis.Chronicle.Storage.EventTypes; -using Cratis.Chronicle.Storage.Jobs; using Cratis.Chronicle.Storage.MongoDB.Events.Constraints; using Cratis.Chronicle.Storage.MongoDB.Namespaces; using Cratis.Chronicle.Storage.MongoDB.Observation.Reactors; diff --git a/Source/Kernel/Storage.MongoDB/Storage.cs b/Source/Kernel/Storage.MongoDB/Storage.cs index bd97918d2..28494c005 100644 --- a/Source/Kernel/Storage.MongoDB/Storage.cs +++ b/Source/Kernel/Storage.MongoDB/Storage.cs @@ -6,11 +6,11 @@ using System.Text.Json; using Cratis.Chronicle.Compliance; using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Concepts.Observation.Reactors.Json; using Cratis.Chronicle.Concepts.Observation.Reducers.Json; using Cratis.Chronicle.Concepts.Projections.Json; using Cratis.Chronicle.Reactive; -using Cratis.Chronicle.Storage.Jobs; using Cratis.Chronicle.Storage.Sinks; using Cratis.Types; using Microsoft.Extensions.Logging; From 535db3311d5732428f614c6d6af6e1ecb8d32d2d Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 13:55:40 +0100 Subject: [PATCH 18/38] Rename RetryFailedPartition job --- .../and_it_fails/and_needs_to_catch_up.cs | 2 +- .../and_it_fails/and_needs_to_catch_up.cs | 2 +- ...IRetryFailedPartitionJob.cs => IRetryFailedPartition.cs} | 2 +- .../Observation/Jobs/RetryFailedPartitionRequest.cs | 2 +- .../when_subscribing/and_there_are_failing_partitions.cs | 4 ++-- .../when_subscribing/and_there_are_no_failing_partitions.cs | 2 +- .../{RetryFailedPartitionJob.cs => RetryFailedPartition.cs} | 6 +++--- Source/Kernel/Grains/Observation/Observer.cs | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) rename Source/Kernel/Grains.Interfaces/Observation/Jobs/{IRetryFailedPartitionJob.cs => IRetryFailedPartition.cs} (81%) rename Source/Kernel/Grains/Observation/Jobs/{RetryFailedPartitionJob.cs => RetryFailedPartition.cs} (91%) diff --git a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs index 86aa25323..25fcac7b0 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs @@ -68,7 +68,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartitionJob)); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition)); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); [Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty(); diff --git a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs index 0c5c97a5f..1c6526745 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs @@ -68,7 +68,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartitionJob)); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition)); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); [Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty(); diff --git a/Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartitionJob.cs b/Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartition.cs similarity index 81% rename from Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartitionJob.cs rename to Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartition.cs index 5d5c404b3..73a2c64df 100644 --- a/Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartitionJob.cs +++ b/Source/Kernel/Grains.Interfaces/Observation/Jobs/IRetryFailedPartition.cs @@ -8,4 +8,4 @@ namespace Cratis.Chronicle.Grains.Observation.Jobs; /// /// Defines the job for retrying a failed partition for an observer. /// -public interface IRetryFailedPartitionJob : IJob; +public interface IRetryFailedPartition : IJob; diff --git a/Source/Kernel/Grains.Interfaces/Observation/Jobs/RetryFailedPartitionRequest.cs b/Source/Kernel/Grains.Interfaces/Observation/Jobs/RetryFailedPartitionRequest.cs index 549c9f18f..af8cd5567 100644 --- a/Source/Kernel/Grains.Interfaces/Observation/Jobs/RetryFailedPartitionRequest.cs +++ b/Source/Kernel/Grains.Interfaces/Observation/Jobs/RetryFailedPartitionRequest.cs @@ -8,7 +8,7 @@ namespace Cratis.Chronicle.Grains.Observation.Jobs; /// -/// Represents the request for a . +/// Represents the request for a . /// /// The additional for the observer to replay. /// The for the observer. diff --git a/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_failing_partitions.cs b/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_failing_partitions.cs index 3de0bf251..d6f405661 100644 --- a/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_failing_partitions.cs +++ b/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_failing_partitions.cs @@ -39,10 +39,10 @@ [Fact] async Task should_get_the_subscription() [Fact] void should_be_in_running_state() => _stateStorage.State.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_start_retry_failed_partition_jobs_for_first_partition() => _jobsManager .Received(1) - .Start(Arg.Any(), Arg.Is(_ => _.Key == firstFailedPartition.Partition && + .Start(Arg.Any(), Arg.Is(_ => _.Key == firstFailedPartition.Partition && _.FromSequenceNumber == firstFailedPartition.LastAttempt.SequenceNumber)); [Fact] void should_start_retry_failed_partition_jobs_for_second_partition() => _jobsManager .Received(1) - .Start(Arg.Any(), Arg.Is(_ => _.Key == secondFailedPartition.Partition && + .Start(Arg.Any(), Arg.Is(_ => _.Key == secondFailedPartition.Partition && _.FromSequenceNumber == secondFailedPartition.LastAttempt.SequenceNumber)); } diff --git a/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_no_failing_partitions.cs b/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_no_failing_partitions.cs index 7f85a995d..354e8a5f5 100644 --- a/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_no_failing_partitions.cs +++ b/Source/Kernel/Grains.Specs/Observation/for_Observer/when_subscribing/and_there_are_no_failing_partitions.cs @@ -31,5 +31,5 @@ [Fact] async Task should_get_the_subscription() } [Fact] void should_be_in_running_state() => _stateStorage.State.RunningState.ShouldEqual(ObserverRunningState.Active); - [Fact] void should_not_start_retry_failed_partition_jobs() => _jobsManager.DidNotReceive().Start(Arg.Any(), Arg.Any()); + [Fact] void should_not_start_retry_failed_partition_jobs() => _jobsManager.DidNotReceive().Start(Arg.Any(), Arg.Any()); } diff --git a/Source/Kernel/Grains/Observation/Jobs/RetryFailedPartitionJob.cs b/Source/Kernel/Grains/Observation/Jobs/RetryFailedPartition.cs similarity index 91% rename from Source/Kernel/Grains/Observation/Jobs/RetryFailedPartitionJob.cs rename to Source/Kernel/Grains/Observation/Jobs/RetryFailedPartition.cs index 0c2b97ac4..6a2c375e0 100644 --- a/Source/Kernel/Grains/Observation/Jobs/RetryFailedPartitionJob.cs +++ b/Source/Kernel/Grains/Observation/Jobs/RetryFailedPartition.cs @@ -13,7 +13,7 @@ namespace Cratis.Chronicle.Grains.Observation.Jobs; /// Represents a job for retrying a failed partition. /// /// The logger. -public class RetryFailedPartitionJob(ILogger logger) : Job, IRetryFailedPartitionJob +public class RetryFailedPartition(ILogger logger) : Job, IRetryFailedPartition { /// public override async Task OnCompleted() @@ -23,13 +23,13 @@ public override async Task OnCompleted() if (State is { HandledAllEvents: false, LastHandledEventSequenceNumber.IsActualValue: true }) { - logger.NotAllEventsWereHandled(nameof(RetryFailedPartitionJob), State.LastHandledEventSequenceNumber); + logger.NotAllEventsWereHandled(nameof(RetryFailedPartition), State.LastHandledEventSequenceNumber); await observer.FailedPartitionPartiallyRecovered(Request.Key, State.LastHandledEventSequenceNumber); } if (!State.LastHandledEventSequenceNumber.IsActualValue) { - logger.NoneEventsWereHandled(nameof(RetryFailedPartitionJob)); + logger.NoneEventsWereHandled(nameof(RetryFailedPartition)); return; } await observer.FailedPartitionRecovered(Request.Key, State.LastHandledEventSequenceNumber); diff --git a/Source/Kernel/Grains/Observation/Observer.cs b/Source/Kernel/Grains/Observation/Observer.cs index 0987f9b7a..f38444401 100644 --- a/Source/Kernel/Grains/Observation/Observer.cs +++ b/Source/Kernel/Grains/Observation/Observer.cs @@ -616,7 +616,7 @@ async Task StartRecoverJobForFailedPartition(FailedPartition failedPartition) using var scope = logger.BeginObserverScope(_observerId, _observerKey); logger.TryingToRecoverFailedPartition(failedPartition.Partition); await RemoveReminder(failedPartition.Partition.ToString()); - await _jobsManager.Start( + await _jobsManager.Start( JobId.New(), new( _observerKey, From 44fee599017a914c3766a1e370fdfb5e2595cdb4 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 13:55:55 +0100 Subject: [PATCH 19/38] Simplify OrleansFixture --- Integration/Orleans.InProcess/OrleansFixture.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Integration/Orleans.InProcess/OrleansFixture.cs b/Integration/Orleans.InProcess/OrleansFixture.cs index c3172b135..c3a878b16 100644 --- a/Integration/Orleans.InProcess/OrleansFixture.cs +++ b/Integration/Orleans.InProcess/OrleansFixture.cs @@ -66,9 +66,9 @@ protected override IHostBuilder CreateHostBuilder() { silo .UseLocalhostClustering() - .AddCratisChronicle(_ => _.EventStoreName = Constants.EventStore) - .AddChronicleToSilo(_ => _ - .WithMongoDB(chronicleOptions.Storage.ConnectionDetails, Constants.EventStore)); + .AddCratisChronicle( + options => options.EventStoreName = Constants.EventStore, + chronicleBuilder => chronicleBuilder.WithMongoDB(chronicleOptions.Storage.ConnectionDetails, Constants.EventStore)); }) .UseConsoleLifetime(); From 05914de40c74a6ef16b23c69551ef93ceee0d6fb Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 13:58:59 +0100 Subject: [PATCH 20/38] Remove GrainIdSerializer because it was never used --- .../Grains/GrainIdSerializer.cs | 27 ------------------- 1 file changed, 27 deletions(-) delete mode 100644 Source/Kernel/Storage.MongoDB/Grains/GrainIdSerializer.cs diff --git a/Source/Kernel/Storage.MongoDB/Grains/GrainIdSerializer.cs b/Source/Kernel/Storage.MongoDB/Grains/GrainIdSerializer.cs deleted file mode 100644 index abd56940e..000000000 --- a/Source/Kernel/Storage.MongoDB/Grains/GrainIdSerializer.cs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) Cratis. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using MongoDB.Bson.Serialization; - -namespace Cratis.Chronicle.Storage.MongoDB.Grains; - -/// -/// Represents a for handling serialization of . -/// -public class GrainIdSerializer : IBsonSerializer -{ - /// - public Type ValueType => typeof(GrainId); - - /// - public GrainId Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) => GrainId.Parse(context.Reader.ReadString()); - - /// - public void Serialize(BsonSerializationContext context, BsonSerializationArgs args, GrainId value) => context.Writer.WriteString(value.ToString()); - - /// - public void Serialize(BsonSerializationContext context, BsonSerializationArgs args, object value) => Serialize(context, args, (GrainId)value); - - /// - object IBsonSerializer.Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) => Deserialize(context, args); -} From c3133c6bc2b5c0098d21318942b504efcb4837bd Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 15:05:15 +0100 Subject: [PATCH 21/38] Add startup task for automatically registering bson serialization providers and bson serializers --- .../ChronicleServerSiloBuilderExtensions.cs | 4 ++++ .../CustomSerializersRegistrationService.cs | 18 ++++++++++++------ .../MongoDBChronicleBuilderExtensions.cs | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/Source/Kernel/Setup/ChronicleServerSiloBuilderExtensions.cs b/Source/Kernel/Setup/ChronicleServerSiloBuilderExtensions.cs index 4bd1eae38..9c28d6e63 100644 --- a/Source/Kernel/Setup/ChronicleServerSiloBuilderExtensions.cs +++ b/Source/Kernel/Setup/ChronicleServerSiloBuilderExtensions.cs @@ -1,9 +1,11 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Connections; using Cratis.Chronicle.Diagnostics.OpenTelemetry; using Cratis.Chronicle.Grains; +using Cratis.Chronicle.Grains.Jobs; using Cratis.Chronicle.Grains.Observation.Placement; using Cratis.Chronicle.Grains.Observation.Reactors.Clients; using Cratis.Chronicle.Grains.Observation.Reducers.Clients; @@ -17,6 +19,7 @@ using Cratis.Chronicle.Storage; using Cratis.Json; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; namespace Orleans.Hosting; @@ -34,6 +37,7 @@ public static class ChronicleServerSiloBuilderExtensions /// for continuation. public static ISiloBuilder AddChronicleToSilo(this ISiloBuilder builder, Action? configure = default) { + builder.Services.TryAddSingleton(); builder .AddChronicleServicesAsInMemory() .AddPlacementDirector() diff --git a/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs index 87978dab6..cd02deee6 100644 --- a/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs +++ b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Compliance.MongoDB; using Cratis.Types; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -12,20 +13,25 @@ namespace Cratis.Chronicle.Storage.MongoDB; /// /// The . /// The . -public class CustomSerializersRegistrationService(IServiceProvider serviceProvider, ITypes types) : IHostedService +public class CustomSerializersRegistrationService(IServiceProvider serviceProvider, ITypes types) : IStartupTask { /// - public Task StartAsync(CancellationToken cancellationToken) + public Task Execute(CancellationToken cancellationToken) { + foreach (var type in types.FindMultiple() + .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType) + .Except([typeof(EncryptionKeySerializer)])) + { + var provider = (IBsonSerializationProvider)ActivatorUtilities.CreateInstance(serviceProvider, type); + BsonSerializer.RegisterSerializationProvider(provider); + } foreach (var type in types.FindMultiple() - .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType)) + .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType) + .Except([typeof(EncryptionKeySerializer)])) { var serializer = (IBsonSerializer)ActivatorUtilities.CreateInstance(serviceProvider, type); BsonSerializer.TryRegisterSerializer(serializer.ValueType, serializer); } return Task.CompletedTask; } - - /// - public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; } diff --git a/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs b/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs index c457fb24d..949c7ff2e 100644 --- a/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs +++ b/Source/Kernel/Storage.MongoDB/MongoDBChronicleBuilderExtensions.cs @@ -33,6 +33,7 @@ public static IChronicleBuilder WithMongoDB(this IChronicleBuilder builder, Chro /// for continuation. public static IChronicleBuilder WithMongoDB(this IChronicleBuilder builder, string server, string database = WellKnownDatabaseNames.Chronicle) { + builder.SiloBuilder.AddStartupTask(ServiceLifecycleStage.First); builder.SiloBuilder .UseMongoDBClient(server) @@ -51,7 +52,6 @@ public static IChronicleBuilder WithMongoDB(this IChronicleBuilder builder, stri services.AddSingleton(); }); - builder.Services.AddHostedService(); return builder; } } \ No newline at end of file From f198c5a9659896129826bf386cdf9b272c1cb5a1 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 4 Feb 2025 15:05:26 +0100 Subject: [PATCH 22/38] WIP --- Source/Kernel/Concepts/Jobs/IJobTypes.cs | 7 +++ Source/Kernel/Grains/Jobs/JobTypes.cs | 17 +++++- .../Jobs/JobStateSerializer.cs | 54 +++++++++++++++++-- 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/Source/Kernel/Concepts/Jobs/IJobTypes.cs b/Source/Kernel/Concepts/Jobs/IJobTypes.cs index 1d2b44bf0..2b2c18240 100644 --- a/Source/Kernel/Concepts/Jobs/IJobTypes.cs +++ b/Source/Kernel/Concepts/Jobs/IJobTypes.cs @@ -55,4 +55,11 @@ enum GetForError /// The . /// or . Result GetRequestClrTypeFor(JobType type); + + /// + /// Gets the job request associated with the . + /// + /// The job request clr type. + /// or . + Result GetJobClrTypeFromRequestClrType(Type jobRequestClrType); } diff --git a/Source/Kernel/Grains/Jobs/JobTypes.cs b/Source/Kernel/Grains/Jobs/JobTypes.cs index bd62ae88f..29db45b3e 100644 --- a/Source/Kernel/Grains/Jobs/JobTypes.cs +++ b/Source/Kernel/Grains/Jobs/JobTypes.cs @@ -17,6 +17,7 @@ public class JobTypes : IJobTypes readonly Dictionary _jobTypes = []; readonly Dictionary _jobTypePerType = []; readonly Dictionary _jobRequestTypes = []; + readonly Dictionary _jobRequestTypeToJobType = []; /// /// Initializes an instance of the class. @@ -56,6 +57,14 @@ public class JobTypes : IJobTypes : IJobTypes.GetRequestClrTypeForError.CouldNotFindType; } + /// + public Result GetJobClrTypeFromRequestClrType(Type jobRequestClrType) + { + return _jobRequestTypeToJobType.TryGetValue(jobRequestClrType, out var jobClrType) + ? jobClrType + : IJobTypes.GetRequestClrTypeForError.CouldNotFindType; + } + void InitializeMap(ITypes types) { PopulateJobTypes(types); @@ -64,7 +73,9 @@ void InitializeMap(ITypes types) void PopulateJobTypes(ITypes types) { - foreach (var jobClrType in types.FindMultiple().Where(type => type is { IsClass: true, IsAbstract: false } && type != typeof(NullJob))) + foreach (var jobClrType in types.FindMultiple() + .Where(type => type is { IsClass: true, IsAbstract: false, IsInterface: false, IsGenericType: false } + && type != typeof(NullJob) && type.Assembly.FullName != typeof(IJob).Assembly.FullName)) { var jobTypeAttribute = jobClrType.GetCustomAttribute(); var jobType = jobTypeAttribute?.JobType ?? jobClrType; @@ -95,7 +106,9 @@ void PopulateJobRequestTypes() throw new JobTypeMustHaveARequestType(jobType, jobClrType); default: // First generic argument of IJob is the type of the request - _jobRequestTypes.Add(jobType, jobInterfaces[0].GetGenericArguments()[0]); + var requestType = jobInterfaces[0].GetGenericArguments()[0]; + _jobRequestTypes.Add(jobType, requestType); + _jobRequestTypeToJobType.Add(requestType, jobClrType); break; } } diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs index 3a5199890..e6dba98c0 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs @@ -1,9 +1,11 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Applications.MongoDB; using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Storage.Jobs; using Cratis.Strings; +using Microsoft.Extensions.DependencyInjection; using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Serializers; @@ -15,21 +17,67 @@ namespace Cratis.Chronicle.Storage.MongoDB.Jobs; /// The . public class JobStateSerializer(IJobTypes jobTypes) : SerializerBase { + public override void Serialize(BsonSerializationContext context, BsonSerializationArgs args, JobState value) + { + var requestType = value.Request.GetType(); + var requestSerializer = BsonSerializer.SerializerRegistry.GetSerializer(requestType); + } /// public override JobState Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) { + var jobRequestElementName = nameof(JobState.Request).ToCamelCase(); var rawBsonDocument = context.Reader.ReadRawBsonDocument(); using var rawDocument = new RawBsonDocument(rawBsonDocument); + rawDocument.Remove(jobRequestElementName); var bsonDocument = rawDocument.ToBsonDocument(); var jobTypeString = bsonDocument.GetValue(nameof(JobState.Type).ToCamelCase()).AsString; var jobRequestType = jobTypes.GetRequestClrTypeFor(new(jobTypeString)).AsT0; - var jobRequestElementName = nameof(JobState.Request).ToCamelCase(); var request = (IJobRequest)BsonSerializer.Deserialize( bsonDocument.GetElement(jobRequestElementName).ToBsonDocument(), jobRequestType); - bsonDocument.Remove(jobRequestElementName); + // bsonDocument.Remove(jobRequestElementName); var jobState = BsonSerializer.Deserialize(bsonDocument); jobState.Request = request; return jobState; } -} \ No newline at end of file +} + +public class JobRequestSerializer(IJobTypes jobTypes) : SerializerBase + where TJobRequest : IJobRequest +{ + /// + public override TJobRequest Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) + { + var rawBsonDocument = context.Reader.ReadRawBsonDocument(); + using var rawDocument = new RawBsonDocument(rawBsonDocument); + var bsonDocument = rawDocument.ToBsonDocument(); + return BsonSerializer.Deserialize(bsonDocument); + } +} + +/// +/// Represents a for concepts. +/// +public class JobRequestSerializationProvider(IServiceProvider serviceProvider) : IBsonSerializationProvider +{ + /// + /// Creates an instance of a serializer of the concept of the given type param T. + /// + /// The Concept type. + /// for the specific type. + public static JobRequestSerializer CreateSerializer(IServiceProvider serviceProvider) + where T : class, IJobRequest + => ActivatorUtilities.CreateInstance>(serviceProvider); + + /// + public IBsonSerializer GetSerializer(Type type) + { + if (type.IsAssignableTo(typeof(IJobRequest))) + { + var createSerializerMethod = GetType().GetMethod(nameof(CreateSerializer))!.MakeGenericMethod(type); + return (createSerializerMethod.Invoke(null, [serviceProvider]) as IBsonSerializer)!; + } + + return null!; + } +} From 643ecca921433150ba81c85b903bd7445c4ce3e2 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 5 Feb 2025 14:31:03 +0100 Subject: [PATCH 23/38] Remove unneeded method from IJobTypes --- Source/Kernel/Concepts/Jobs/IJobTypes.cs | 7 ------- Source/Kernel/Grains/Jobs/JobTypes.cs | 10 ---------- 2 files changed, 17 deletions(-) diff --git a/Source/Kernel/Concepts/Jobs/IJobTypes.cs b/Source/Kernel/Concepts/Jobs/IJobTypes.cs index 2b2c18240..1d2b44bf0 100644 --- a/Source/Kernel/Concepts/Jobs/IJobTypes.cs +++ b/Source/Kernel/Concepts/Jobs/IJobTypes.cs @@ -55,11 +55,4 @@ enum GetForError /// The . /// or . Result GetRequestClrTypeFor(JobType type); - - /// - /// Gets the job request associated with the . - /// - /// The job request clr type. - /// or . - Result GetJobClrTypeFromRequestClrType(Type jobRequestClrType); } diff --git a/Source/Kernel/Grains/Jobs/JobTypes.cs b/Source/Kernel/Grains/Jobs/JobTypes.cs index 29db45b3e..c6fd3aacf 100644 --- a/Source/Kernel/Grains/Jobs/JobTypes.cs +++ b/Source/Kernel/Grains/Jobs/JobTypes.cs @@ -17,7 +17,6 @@ public class JobTypes : IJobTypes readonly Dictionary _jobTypes = []; readonly Dictionary _jobTypePerType = []; readonly Dictionary _jobRequestTypes = []; - readonly Dictionary _jobRequestTypeToJobType = []; /// /// Initializes an instance of the class. @@ -57,14 +56,6 @@ public class JobTypes : IJobTypes : IJobTypes.GetRequestClrTypeForError.CouldNotFindType; } - /// - public Result GetJobClrTypeFromRequestClrType(Type jobRequestClrType) - { - return _jobRequestTypeToJobType.TryGetValue(jobRequestClrType, out var jobClrType) - ? jobClrType - : IJobTypes.GetRequestClrTypeForError.CouldNotFindType; - } - void InitializeMap(ITypes types) { PopulateJobTypes(types); @@ -108,7 +99,6 @@ void PopulateJobRequestTypes() // First generic argument of IJob is the type of the request var requestType = jobInterfaces[0].GetGenericArguments()[0]; _jobRequestTypes.Add(jobType, requestType); - _jobRequestTypeToJobType.Add(requestType, jobClrType); break; } } From fe4b39e26f8c8fbc6d534d1cd966c1a65cc8082a Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 5 Feb 2025 14:36:14 +0100 Subject: [PATCH 24/38] Implement JobStateSerializer --- .../Jobs/JobStateSerializer.cs | 92 ++++++++----------- 1 file changed, 37 insertions(+), 55 deletions(-) diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs index e6dba98c0..3fb68972c 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs @@ -1,12 +1,11 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using Cratis.Applications.MongoDB; using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Storage.Jobs; using Cratis.Strings; -using Microsoft.Extensions.DependencyInjection; using MongoDB.Bson; +using MongoDB.Bson.IO; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Serializers; namespace Cratis.Chronicle.Storage.MongoDB.Jobs; @@ -17,67 +16,50 @@ namespace Cratis.Chronicle.Storage.MongoDB.Jobs; /// The . public class JobStateSerializer(IJobTypes jobTypes) : SerializerBase { + /// public override void Serialize(BsonSerializationContext context, BsonSerializationArgs args, JobState value) - { - var requestType = value.Request.GetType(); - var requestSerializer = BsonSerializer.SerializerRegistry.GetSerializer(requestType); - } + => BsonSerializer.SerializerRegistry.GetSerializer().Serialize(context, args, value); + /// public override JobState Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) { - var jobRequestElementName = nameof(JobState.Request).ToCamelCase(); - var rawBsonDocument = context.Reader.ReadRawBsonDocument(); - using var rawDocument = new RawBsonDocument(rawBsonDocument); - rawDocument.Remove(jobRequestElementName); - var bsonDocument = rawDocument.ToBsonDocument(); - var jobTypeString = bsonDocument.GetValue(nameof(JobState.Type).ToCamelCase()).AsString; - var jobRequestType = jobTypes.GetRequestClrTypeFor(new(jobTypeString)).AsT0; - var request = (IJobRequest)BsonSerializer.Deserialize( - bsonDocument.GetElement(jobRequestElementName).ToBsonDocument(), - jobRequestType); - // bsonDocument.Remove(jobRequestElementName); - var jobState = BsonSerializer.Deserialize(bsonDocument); - jobState.Request = request; - return jobState; - } -} + var (jobState, requestBookmark, endBookmark) = DeserializeJobStateExceptRequest(context); + context.Reader.ReturnToBookmark(requestBookmark); + var jobRequestClrType = jobTypes.GetRequestClrTypeFor(jobState.Type).Match(type => type, _ => throw new UnknownClrTypeForJobType(jobState.Type)); + ValueType.GetProperty(nameof(JobState.Request))! + .SetValue(jobState, BsonSerializer.Deserialize(context.Reader, jobRequestClrType)); -public class JobRequestSerializer(IJobTypes jobTypes) : SerializerBase - where TJobRequest : IJobRequest -{ - /// - public override TJobRequest Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args) - { - var rawBsonDocument = context.Reader.ReadRawBsonDocument(); - using var rawDocument = new RawBsonDocument(rawBsonDocument); - var bsonDocument = rawDocument.ToBsonDocument(); - return BsonSerializer.Deserialize(bsonDocument); + context.Reader.ReturnToBookmark(endBookmark); + context.Reader.ReadEndDocument(); + return jobState; } -} - -/// -/// Represents a for concepts. -/// -public class JobRequestSerializationProvider(IServiceProvider serviceProvider) : IBsonSerializationProvider -{ - /// - /// Creates an instance of a serializer of the concept of the given type param T. - /// - /// The Concept type. - /// for the specific type. - public static JobRequestSerializer CreateSerializer(IServiceProvider serviceProvider) - where T : class, IJobRequest - => ActivatorUtilities.CreateInstance>(serviceProvider); - /// - public IBsonSerializer GetSerializer(Type type) + static (JobState JobState, BsonReaderBookmark JobRequestBookmark, BsonReaderBookmark EndBookmark) DeserializeJobStateExceptRequest(BsonDeserializationContext context) { - if (type.IsAssignableTo(typeof(IJobRequest))) + var jobState = new JobState(); + BsonReaderBookmark requestBookmark = default!; + context.Reader.ReadStartDocument(); + var bsonCLassMap = BsonClassMap.LookupClassMap(typeof(JobState)).AllMemberMaps; + while (context.Reader.ReadBsonType() != BsonType.EndOfDocument) { - var createSerializerMethod = GetType().GetMethod(nameof(CreateSerializer))!.MakeGenericMethod(type); - return (createSerializerMethod.Invoke(null, [serviceProvider]) as IBsonSerializer)!; + var fieldName = context.Reader.ReadName(); + var propertyName = fieldName.ToPascalCase(); + if (propertyName == nameof(JobState.Request)) + { + requestBookmark = context.Reader.GetBookmark(); + context.Reader.SkipValue(); + continue; + } + var memberMap = bsonCLassMap.FirstOrDefault(map => map.ElementName.Equals(fieldName)); + if (memberMap == null) + { + context.Reader.SkipValue(); + continue; + } + var jobStateProperty = typeof(JobState).GetProperty(memberMap.MemberName)!; + jobStateProperty.SetValue(jobState, BsonSerializer.Deserialize(context.Reader, memberMap.MemberType)); } - - return null!; + var endBookmark = context.Reader.GetBookmark(); + return (jobState, requestBookmark, endBookmark); } -} +} \ No newline at end of file From 39cfd0ff38851cd2b0df15a74256e59504ede452 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:23:47 +0100 Subject: [PATCH 25/38] Remove Name from Job proto contract --- Source/Kernel/Contracts/Jobs/Job.cs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/Source/Kernel/Contracts/Jobs/Job.cs b/Source/Kernel/Contracts/Jobs/Job.cs index 17cdf7532..28e22ec2c 100644 --- a/Source/Kernel/Contracts/Jobs/Job.cs +++ b/Source/Kernel/Contracts/Jobs/Job.cs @@ -18,45 +18,39 @@ public class Job [ProtoMember(1)] public Guid Id { get; set; } - /// - /// Gets or sets the name of the job. - /// - [ProtoMember(2)] - public string Name { get; set; } - /// /// Gets or sets the details for a job. /// - [ProtoMember(3)] + [ProtoMember(2)] public string Details { get; set; } /// /// Gets or sets the type of the job. /// - [ProtoMember(4)] + [ProtoMember(3)] public string Type { get; set; } /// /// Gets or sets the status of the job. /// - [ProtoMember(5)] + [ProtoMember(4)] public JobStatus Status { get; set; } /// /// Gets or sets when job was created. /// - [ProtoMember(6)] + [ProtoMember(5)] public SerializableDateTimeOffset Created { get; set; } /// /// Gets or sets collection of status changes that happened to the job. /// - [ProtoMember(7, IsRequired = true)] + [ProtoMember(6, IsRequired = true)] public IList StatusChanges { get; set; } = []; /// /// Gets or sets the . /// - [ProtoMember(8, IsRequired = true)] + [ProtoMember(7, IsRequired = true)] public JobProgress Progress { get; set; } = new(); } From d5ad20d4e3f15a5c53020b975f82d19cde33e5a9 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:24:50 +0100 Subject: [PATCH 26/38] Add static singleton instance to JobTypes --- Source/Kernel/Grains/Jobs/JobTypes.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Source/Kernel/Grains/Jobs/JobTypes.cs b/Source/Kernel/Grains/Jobs/JobTypes.cs index c6fd3aacf..6b2c1298e 100644 --- a/Source/Kernel/Grains/Jobs/JobTypes.cs +++ b/Source/Kernel/Grains/Jobs/JobTypes.cs @@ -14,6 +14,11 @@ namespace Cratis.Chronicle.Grains.Jobs; [Singleton] public class JobTypes : IJobTypes { + /// + /// Gets the singleton instance of . + /// + public static IJobTypes Instance { get; private set; } + readonly Dictionary _jobTypes = []; readonly Dictionary _jobTypePerType = []; readonly Dictionary _jobRequestTypes = []; @@ -22,7 +27,11 @@ public class JobTypes : IJobTypes /// Initializes an instance of the class. /// /// The . - public JobTypes(ITypes types) => InitializeMap(types); + public JobTypes(ITypes types) + { + InitializeMap(types); + Instance = this; + } /// public Result GetFor(Type type) => From f8183a2454aa7f8fa40f60250f1aec9fa86a9e73 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:25:06 +0100 Subject: [PATCH 27/38] format code --- Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs index 3fb68972c..16a23d818 100644 --- a/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs +++ b/Source/Kernel/Storage.MongoDB/Jobs/JobStateSerializer.cs @@ -43,8 +43,7 @@ public override JobState Deserialize(BsonDeserializationContext context, BsonDes while (context.Reader.ReadBsonType() != BsonType.EndOfDocument) { var fieldName = context.Reader.ReadName(); - var propertyName = fieldName.ToPascalCase(); - if (propertyName == nameof(JobState.Request)) + if (fieldName.ToPascalCase() == nameof(JobState.Request)) { requestBookmark = context.Reader.GetBookmark(); context.Reader.SkipValue(); From 8cf064671fb4899cf9f5b2c07f74880c751d298d Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:26:48 +0100 Subject: [PATCH 28/38] Remove unneeded json converters from Setup and create a JobState json converter --- .../Setup/Serialization/JobStateConverter.cs | 78 +++++++++++++++++++ .../Serialization/JobStateJsonConverter.cs | 17 ---- .../JobStepStateJsonConverter.cs | 17 ---- .../SerializationConfigurationExtensions.cs | 4 +- 4 files changed, 79 insertions(+), 37 deletions(-) create mode 100644 Source/Kernel/Setup/Serialization/JobStateConverter.cs delete mode 100644 Source/Kernel/Setup/Serialization/JobStateJsonConverter.cs delete mode 100644 Source/Kernel/Setup/Serialization/JobStepStateJsonConverter.cs diff --git a/Source/Kernel/Setup/Serialization/JobStateConverter.cs b/Source/Kernel/Setup/Serialization/JobStateConverter.cs new file mode 100644 index 000000000..011a6e73f --- /dev/null +++ b/Source/Kernel/Setup/Serialization/JobStateConverter.cs @@ -0,0 +1,78 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Grains.Jobs; +using Cratis.Chronicle.Storage.Jobs; +using Cratis.Json; +using Cratis.Strings; + +namespace Cratis.Chronicle.Setup.Serialization; + +/// +/// Represents a that can convert . +/// +public class JobStateConverter : JsonConverter +{ + readonly Lazy _jobTypes; + + /// + /// Initializes a new instance of the class. + /// + /// . + public JobStateConverter(IJobTypes jobTypes) + { + _jobTypes = new(jobTypes); + } + + /// + /// Initializes a new instance of the class. + /// + public JobStateConverter() + { + _jobTypes = new(() => JobTypes.Instance); + } + + /// + public override JobState? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var node = JsonElement.ParseValue(ref reader); + if (node.ValueKind != JsonValueKind.Object) + { + return default!; + } + var jobStateResult = new JobState(); + var nodeAsObject = JsonObject.Create(node); + jobStateResult.Type = new(nodeAsObject![nameof(JobState.Type).ToCamelCase()]!.ToString()); + var jobRequestType = _jobTypes.Value.GetRequestClrTypeFor(jobStateResult.Type).Match(type => type, error => throw new UnknownClrTypeForJobType(jobStateResult.Type)); + + foreach (var (field, value) in nodeAsObject) + { + if (field == nameof(JobState.Request).ToCamelCase()) + { + continue; + } + + var propertyName = field.ToPascalCase(); + var jobStatePropertyInfo = typeToConvert.GetProperty(propertyName); + if (jobStatePropertyInfo?.SetMethod is null) + { + continue; + } + + var deserializedValue = value.Deserialize(jobStatePropertyInfo.PropertyType, options); + jobStatePropertyInfo.SetValue(jobStateResult, deserializedValue); + } + var jobStateRequestProperty = typeToConvert.GetProperty(nameof(JobState.Request))!; + var jobRequest = nodeAsObject[nameof(JobState.Request).ToCamelCase()]!.Deserialize(jobRequestType, options); + jobStateRequestProperty.SetValue(jobStateResult, jobRequest); + + return jobStateResult; + } + + /// + public override void Write(Utf8JsonWriter writer, JobState value, JsonSerializerOptions options) => JsonSerializer.Serialize(writer, value, Globals.JsonSerializerOptions); +} diff --git a/Source/Kernel/Setup/Serialization/JobStateJsonConverter.cs b/Source/Kernel/Setup/Serialization/JobStateJsonConverter.cs deleted file mode 100644 index 12bdb74b5..000000000 --- a/Source/Kernel/Setup/Serialization/JobStateJsonConverter.cs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) Cratis. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Text.Json.Serialization; -using Cratis.Chronicle.Storage.Jobs; -using Cratis.Json; - -namespace Cratis.Chronicle.Setup.Serialization; - -/// -/// Represents the that can convert . -/// -public class JobStateJsonConverter : TypeWithObjectPropertiesJsonConverter -{ - /// - protected override IEnumerable ObjectProperties => [nameof(JobState.Request)]; -} diff --git a/Source/Kernel/Setup/Serialization/JobStepStateJsonConverter.cs b/Source/Kernel/Setup/Serialization/JobStepStateJsonConverter.cs deleted file mode 100644 index 1b8af29a0..000000000 --- a/Source/Kernel/Setup/Serialization/JobStepStateJsonConverter.cs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) Cratis. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Text.Json.Serialization; -using Cratis.Chronicle.Storage.Jobs; -using Cratis.Json; - -namespace Cratis.Chronicle.Setup.Serialization; - -/// -/// Represents the that can convert . -/// -public class JobStepStateJsonConverter : TypeWithObjectPropertiesJsonConverter -{ - /// - protected override IEnumerable ObjectProperties => [nameof(JobStepState.Request)]; -} diff --git a/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs b/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs index a270a19eb..4d198c43e 100644 --- a/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs +++ b/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs @@ -78,11 +78,9 @@ static void Configure(this IServiceCollection services) options.Converters.Add(new JoinDefinitionsConverter()); options.Converters.Add(new RemovedWithDefinitionsConverter()); options.Converters.Add(new RemovedWithJoinDefinitionsConverter()); + options.Converters.Add(new JobStateConverter()); options.Converters.Add(new TypeWithObjectPropertiesJsonConverterFactory()); options.Converters.Add(new TypeWithObjectPropertiesJsonConverterFactory()); - options.Converters.Add(new TypeWithObjectPropertiesJsonConverterFactory()); - options.Converters.Add(new TypeWithObjectPropertiesJsonConverterFactory()); - services.AddConceptSerializer(); services.AddAppendedEventSerializer(); services.AddSerializer( From 9d23cf91fa11235ad89090783728253fe85a8f09 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:26:55 +0100 Subject: [PATCH 29/38] Fix integration tests --- Integration/Base/MongoDBDatabase.cs | 25 ++++++++++++------- .../and_it_fails/and_needs_to_catch_up.cs | 2 +- .../and_it_fails/but_not_second_time.cs | 4 +-- .../and_it_fails/but_not_third_time.cs | 2 +- .../and_it_fails/and_needs_to_catch_up.cs | 2 +- .../and_it_fails/but_not_second_time.cs | 2 +- .../and_it_fails/but_not_third_time.cs | 2 +- 7 files changed, 23 insertions(+), 16 deletions(-) diff --git a/Integration/Base/MongoDBDatabase.cs b/Integration/Base/MongoDBDatabase.cs index 243fcc6b6..d28b54059 100644 --- a/Integration/Base/MongoDBDatabase.cs +++ b/Integration/Base/MongoDBDatabase.cs @@ -53,16 +53,23 @@ public MongoDBDatabase(IContainer mongoDBContainer, string database) foreach (var document in cursor.Current) { - _changes.OnNext(document); - var changesCollection = changeDatabase.GetCollection(document.CollectionNamespace.CollectionName); - var changeDocument = new BsonDocument + try { - { "_id", ObjectId.GenerateNewId() }, - { "documentKey", document.DocumentKey }, - { "operationType", document.OperationType.ToString() }, - { "fullDocument", document.FullDocument } - }; - await changesCollection.InsertOneAsync(changeDocument); + _changes.OnNext(document); + var changesCollection = changeDatabase.GetCollection(document.CollectionNamespace.CollectionName); + var changeDocument = new BsonDocument + { + { "_id", ObjectId.GenerateNewId() }, + { "documentKey", (BsonValue)document.DocumentKey ?? "N/A" }, + { "operationType", document.OperationType.ToString() }, + { "fullDocument", document.FullDocument ?? new BsonDocument() } + }; + await changesCollection.InsertOneAsync(changeDocument); + } + catch + { + // ignored + } } } }); diff --git a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs index 25fcac7b0..a28b2a660 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/and_needs_to_catch_up.cs @@ -70,7 +70,7 @@ async Task Because() [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition)); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); - [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); + [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Type == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); [Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(1ul); diff --git a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_second_time.cs b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_second_time.cs index 59a63c03b..b39da150b 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_second_time.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_second_time.cs @@ -42,7 +42,7 @@ async Task Because() // Wait for the first event to have been handled await Tcs[0].Task.WaitAsync(waitTime); - FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId); + FailedPartitionsBeforeRetry = await EventStore.WaitForThereToBeFailedPartitions(ObserverId, waitTime); Jobs = await EventStore.WaitForThereToBeJobs(waitTime); // Wait for the second event to have been handled @@ -56,7 +56,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob"); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition"); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul); diff --git a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_third_time.cs b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_third_time.cs index a074c0508..c42a32d10 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_third_time.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_handling_event/and_it_fails/but_not_third_time.cs @@ -64,7 +64,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob"); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition"); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul); diff --git a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs index 1c6526745..2ebd94cbe 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/and_needs_to_catch_up.cs @@ -70,7 +70,7 @@ async Task Because() [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain(nameof(RetryFailedPartition)); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); - [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Name == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); + [Fact] void should_start_catchup_for_partition_job() => Context.JobsWithCatchUp.SingleOrDefault(_ => _.Type == nameof(CatchUpObserverPartition)).ShouldNotBeNull(); [Fact] void should_have_completed_all_jobs_at_the_end() => Context.JobsAfterCompleted.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(1ul); diff --git a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_second_time.cs b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_second_time.cs index 936d7b948..de55e2751 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_second_time.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_second_time.cs @@ -55,7 +55,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob"); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition"); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul); diff --git a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_third_time.cs b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_third_time.cs index 3e5023b78..b1010aecd 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_third_time.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_handling_event/and_it_fails/but_not_third_time.cs @@ -64,7 +64,7 @@ async Task Because() } [Fact] void should_fail_one_partition() => Context.FailedPartitionsBeforeRetry.Count().ShouldEqual(1); - [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartitionJob"); + [Fact] void should_start_replaying_job() => Context.Jobs.First().Type.ShouldContain("RetryFailedPartition"); [Fact] void should_recover_failed_partition() => Context.FailedPartitionsAfterRetry.ShouldBeEmpty(); [Fact] void should_have_the_active_observer_running_state() => Context.ObserverState.RunningState.ShouldEqual(ObserverRunningState.Active); [Fact] void should_have_correct_last_handled_event_sequence_number() => Context.ObserverState.LastHandledEventSequenceNumber.Value.ShouldEqual(0ul); From 2840328f715a8ea6309cdb0df34375bd8beb3dba Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 6 Feb 2025 10:44:09 +0100 Subject: [PATCH 30/38] Remove Request from JobStepState --- Source/Kernel/Storage/Jobs/JobStepState.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/Source/Kernel/Storage/Jobs/JobStepState.cs b/Source/Kernel/Storage/Jobs/JobStepState.cs index 49c15acab..24b5a2052 100644 --- a/Source/Kernel/Storage/Jobs/JobStepState.cs +++ b/Source/Kernel/Storage/Jobs/JobStepState.cs @@ -18,7 +18,6 @@ public class JobStepState /// /// Gets or sets the . /// - // TODO: We probably have to do the same thing here as for JobType public JobStepType Type { get; set; } = JobStepType.NotSet; /// @@ -40,10 +39,4 @@ public class JobStepState /// Gets or sets the . /// public JobStepProgress Progress { get; set; } = new(); - - /// - /// Gets or sets the request associated with the job step. - /// - // TODO: We probably have to remove this. It's at least not necessary - public object Request { get; set; } = null!; } From 365dba05a4c20ca43a8991e77c1ca8dece53b53e Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:16:09 +0100 Subject: [PATCH 31/38] JobsManager Rehydrate now resumes jobs a little more efficiently --- Source/Kernel/Grains/Jobs/JobsManager.cs | 103 +++++++++++------------ 1 file changed, 47 insertions(+), 56 deletions(-) diff --git a/Source/Kernel/Grains/Jobs/JobsManager.cs b/Source/Kernel/Grains/Jobs/JobsManager.cs index d7f339cc2..401a7a33a 100644 --- a/Source/Kernel/Grains/Jobs/JobsManager.cs +++ b/Source/Kernel/Grains/Jobs/JobsManager.cs @@ -56,16 +56,10 @@ public async Task Rehydrate() Task RehydrateJobs(IEnumerable runningJobs) { - var tasks = runningJobs.Select(_ => _.Id).Select(async jobId => + var tasks = runningJobs.Select(async jobState => { - try - { - await Resume(jobId); - } - catch (Exception ex) - { - logger.ErrorResumingJob(ex, jobId); - } + var resumeJobResult = await ResumeJob(jobState); + await HandleResumeJobResult(jobState.Id, resumeJobResult); }); return Task.WhenAll(tasks); } @@ -101,48 +95,11 @@ await jobStateResult.Match( async jobState => { var resumeJobResult = await ResumeJob(jobState); - await HandleResumeJobResult(resumeJobResult); + await HandleResumeJobResult(jobId, resumeJobResult); }, error => HandleError(jobId, error), error => HandleUnknownFailure(jobId, error)); return; - - async Task HandleResumeJobResult(OneOf, Storage.Jobs.JobError> resumeJobResult) - { - await resumeJobResult.Match( - resumeResult => resumeResult.Match( - success => - { - switch (success) - { - case ResumeJobSuccess.JobAlreadyRunning: - logger.CannotResumeJobBecauseAlreadyRunning(jobId); - break; - case ResumeJobSuccess.JobCannotBeResumed: - logger.CannotResumeJob(jobId); - break; - } - return Task.CompletedTask; - }, - resumeError => resumeError.Match( - jobError => HandleError(jobId, jobError), - failedResumingSteps => - { - logger.FailedToResumeJobSteps(jobId, failedResumingSteps.FailedJobSteps); - return Task.CompletedTask; - })), - jobError => HandleError(jobId, jobError)); - } - Task, Storage.Jobs.JobError>> ResumeJob(JobState jobState) => GetJobGrain(jobState) - .Match, Storage.Jobs.JobError>>>( - async job => - { - var result = await job.Resume(); - return result.Match, Storage.Jobs.JobError>>( - success => Result.Success(success), - error => Result.Failed(error)); - }, - _ => Task.FromResult, Storage.Jobs.JobError>>(Storage.Jobs.JobError.TypeIsNotAssociatedWithAJobType)); } /// @@ -150,7 +107,7 @@ public async Task Stop(JobId jobId) { using var scope = logger.BeginJobsManagerScope(_key); - var stopJobResult = await TryStopJob(jobId); + var stopJobResult = await StopJob(jobId); await stopJobResult.Match( _ => Task.CompletedTask, jobError => HandleError(jobId, jobError), @@ -165,7 +122,7 @@ public async Task Delete(JobId jobId) logger.DeletingJob(jobId); - var stopJobResult = await TryStopJob(jobId); + var stopJobResult = await StopJob(jobId); var stoppedJob = await stopJobResult.Match( _ => Task.FromResult(true), async jobError => @@ -230,7 +187,45 @@ public async Task> GetAllJobs() jobType => Result.Success((IJob)GrainFactory.GetGrain(jobType, jobState.Id, new JobKey(_key.EventStore, _key.Namespace))), error => error)!; - async Task> TryStopJob(JobId jobId) + async Task HandleResumeJobResult(JobId jobId, OneOf, Storage.Jobs.JobError> resumeJobResult) + { + await resumeJobResult.Match( + resumeResult => resumeResult.Match( + success => + { + switch (success) + { + case ResumeJobSuccess.JobAlreadyRunning: + logger.CannotResumeJobBecauseAlreadyRunning(jobId); + break; + case ResumeJobSuccess.JobCannotBeResumed: + logger.CannotResumeJob(jobId); + break; + } + return Task.CompletedTask; + }, + resumeError => resumeError.Match( + jobError => HandleError(jobId, jobError), + failedResumingSteps => + { + logger.FailedToResumeJobSteps(jobId, failedResumingSteps.FailedJobSteps); + return Task.CompletedTask; + })), + jobError => HandleError(jobId, jobError)); + } + + Task, Storage.Jobs.JobError>> ResumeJob(JobState jobState) => GetJobGrain(jobState) + .Match, Storage.Jobs.JobError>>>( + async job => + { + var result = await job.Resume(); + return result.Match, Storage.Jobs.JobError>>( + success => Result.Success(success), + error => Result.Failed(error)); + }, + _ => Task.FromResult, Storage.Jobs.JobError>>(Storage.Jobs.JobError.TypeIsNotAssociatedWithAJobType)); + + async Task> StopJob(JobId jobId) { logger.StoppingJob(jobId); @@ -238,7 +233,7 @@ public async Task> GetAllJobs() return await jobStateResult.Match( async jobState => { - var stopJobResult = await StopJob(jobState); + var stopJobResult = await GetJobGrainAndStop(jobState); return stopJobResult.Match>( none => none, jobError => jobError, @@ -247,7 +242,7 @@ public async Task> GetAllJobs() error => Task.FromResult>(error), ex => Task.FromResult>(ex)); - Task> StopJob(JobState jobState) => GetJobGrain(jobState) + Task> GetJobGrainAndStop(JobState jobState) => GetJobGrain(jobState) .Match>>( async job => { @@ -295,16 +290,12 @@ Task HandleError(JobId jobId, JobError jobError) Task HandleUnknownFailure(JobId jobId, Exception ex) { logger.UnknownError(ex, jobId); - - // TODO: I'm not sure yet whether to throw or not. return Task.FromException(ex); } Task HandleUnknownFailure(Exception ex) { logger.UnknownError(ex); - - // TODO: I'm not sure yet whether to throw or not. return Task.FromException(ex); } } From 47fa364876df14ae9c0dfff27bb81bd68422e52f Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:16:29 +0100 Subject: [PATCH 32/38] Remove setting of JobStep Request state --- Source/Kernel/Grains/Jobs/JobStep.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Source/Kernel/Grains/Jobs/JobStep.cs b/Source/Kernel/Grains/Jobs/JobStep.cs index e84e68560..1a358e3ee 100644 --- a/Source/Kernel/Grains/Jobs/JobStep.cs +++ b/Source/Kernel/Grains/Jobs/JobStep.cs @@ -116,7 +116,6 @@ public async Task> Start(GrainId jobId, TReques ThisJobStep = GetReferenceToSelf>(); await Start(request, _cancellationTokenSource.Token); - state.State.Request = request!; var writeStateResult = await WriteStatusChange(JobStepStatus.Running); return writeStateResult.Match( _ => Result.Success(), From 4cad852d158bf1c636257d32f361b552ed10b18e Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:16:35 +0100 Subject: [PATCH 33/38] Fix broken specs --- Source/Kernel/Grains.Specs/Jobs/SomeJobRequest.cs | 3 ++- Source/Kernel/Grains.Specs/Jobs/for_Job/given/SomeRequest.cs | 3 ++- .../Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs | 5 +++++ .../Grains.Specs/Jobs/for_JobsManager/when_deleting_job.cs | 2 ++ .../Grains.Specs/Jobs/for_JobsManager/when_rehydrating.cs | 3 +++ .../Grains.Specs/Jobs/for_JobsManager/when_resuming.cs | 2 ++ .../Grains.Specs/Jobs/for_JobsManager/when_stopping_job.cs | 2 ++ 7 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Grains.Specs/Jobs/SomeJobRequest.cs b/Source/Kernel/Grains.Specs/Jobs/SomeJobRequest.cs index 90dd2e05b..3ddaad71b 100644 --- a/Source/Kernel/Grains.Specs/Jobs/SomeJobRequest.cs +++ b/Source/Kernel/Grains.Specs/Jobs/SomeJobRequest.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts.Jobs; namespace Cratis.Chronicle.Grains.Jobs; -public record SomeJobRequest(int SomeNumber); \ No newline at end of file +public record SomeJobRequest(int SomeNumber) : IJobRequest; \ No newline at end of file diff --git a/Source/Kernel/Grains.Specs/Jobs/for_Job/given/SomeRequest.cs b/Source/Kernel/Grains.Specs/Jobs/for_Job/given/SomeRequest.cs index 1b98c515b..fbea8318b 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_Job/given/SomeRequest.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_Job/given/SomeRequest.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts.Jobs; namespace Cratis.Chronicle.Grains.Jobs.for_Job.given; -public class SomeRequest; \ No newline at end of file +public class SomeRequest : IJobRequest; \ No newline at end of file diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs index 3f42086a7..c6de8c1ca 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs @@ -23,6 +23,7 @@ public class the_manager : Specification protected IEventStoreNamespaceStorage _namespaceStorage; protected IJobStorage _jobStorage; protected IJobStepStorage _jobStepStorage; + protected IJobTypes _jobTypes; protected List _storedJobs; @@ -34,11 +35,13 @@ async Task Establish() _namespaceStorage = Substitute.For(); _jobStorage = Substitute.For(); _jobStepStorage = Substitute.For(); + _jobTypes = Substitute.For(); _storage.GetEventStore(Arg.Any()).Returns(_eventStoreStorage); _eventStoreStorage.GetNamespace(Arg.Any()).Returns(_namespaceStorage); _namespaceStorage.Jobs.Returns(_jobStorage); _namespaceStorage.JobSteps.Returns(_jobStepStorage); + _jobTypes.GetClrTypeFor(Arg.Any()).Returns(Result.Failed(IJobTypes.GetClrTypeForError.CouldNotFindType)); _jobStorage.GetJobs(Arg.Any()).Returns(_ => Task.FromResult(Catch.Success>([.. _storedJobs]))); _jobStorage.GetJob(Arg.Any()).Returns(callInfo => Task.FromResult( @@ -48,6 +51,7 @@ async Task Establish() _jobStepStorage.RemoveAllForJob(Arg.Any()).Returns(Task.FromResult(Catch.Success())); _silo.AddService(_storage); _silo.AddService(NullLogger.Instance); + _silo.AddService(_jobTypes); var loggerFactory = Substitute.For(); _silo.AddService(loggerFactory); _managerKey = new ("event-store", "namespace"); @@ -63,6 +67,7 @@ protected Mock AddJob(JobId id) Id = id }; _storedJobs.Add(state); + _jobTypes.GetClrTypeFor(state.Type).Returns(Result.Success(typeof(TJob))); return _silo.AddProbe(state.Id, keyExtension: new JobKey(_managerKey.EventStore, _managerKey.Namespace)); } } diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_deleting_job.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_deleting_job.cs index 1e9614ca1..bcefa3512 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_deleting_job.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_deleting_job.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Moq; namespace Cratis.Chronicle.Grains.Jobs.for_JobsManager; @@ -14,6 +15,7 @@ void Establish() { _jobId = Guid.Parse("24ff9a76-a590-49b7-847d-28fcc9bf1024"); _job = AddJob(_jobId); + _job.Setup(_ => _.Stop()).ReturnsAsync(Result.Success()); } Task Because() => _manager.Delete(_jobId); diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_rehydrating.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_rehydrating.cs index 887be89bd..456dcbe50 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_rehydrating.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_rehydrating.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Moq; namespace Cratis.Chronicle.Grains.Jobs.for_JobsManager; @@ -18,6 +19,8 @@ void Establish() _secondJobId = Guid.Parse("d090f5e6-5a6a-43b1-8580-4973e3c69521"); _firstJob = AddJob(_firstJobId); _secondJob = AddJob(_secondJobId); + _firstJob.Setup(_ => _.Resume()).ReturnsAsync(Result.Success(ResumeJobSuccess.Success)); + _secondJob.Setup(_ => _.Resume()).ReturnsAsync(Result.Success(ResumeJobSuccess.Success)); } Task Because() => _manager.Rehydrate(); diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_resuming.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_resuming.cs index 8171d7360..82d9fc4c4 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_resuming.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_resuming.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Moq; namespace Cratis.Chronicle.Grains.Jobs.for_JobsManager; @@ -14,6 +15,7 @@ void Establish() { _jobId = Guid.Parse("24ff9a76-a590-49b7-847d-28fcc9bf1024"); _job = AddJob(_jobId); + _job.Setup(_ => _.Resume()).ReturnsAsync(Result.Success(ResumeJobSuccess.Success)); } Task Because() => _manager.Resume(_jobId); diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_stopping_job.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_stopping_job.cs index d25c6385c..f28d24745 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_stopping_job.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/when_stopping_job.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Moq; namespace Cratis.Chronicle.Grains.Jobs.for_JobsManager; @@ -14,6 +15,7 @@ void Establish() { _jobId = Guid.Parse("24ff9a76-a590-49b7-847d-28fcc9bf1024"); _job = AddJob(_jobId); + _job.Setup(_ => _.Stop()).ReturnsAsync(Result.Success()); } Task Because() => _manager.Stop(_jobId); From 6d968a96e9dc3952460901b54d1e3017c360c957 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:16:43 +0100 Subject: [PATCH 34/38] Build --- Source/Workbench/Web/Api/Jobs/AllJobs.ts | 18 +++++++++--------- Source/Workbench/Web/Api/Jobs/Job.ts | 7 ++++--- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Source/Workbench/Web/Api/Jobs/AllJobs.ts b/Source/Workbench/Web/Api/Jobs/AllJobs.ts index dec77ba0c..7de30906c 100644 --- a/Source/Workbench/Web/Api/Jobs/AllJobs.ts +++ b/Source/Workbench/Web/Api/Jobs/AllJobs.ts @@ -13,19 +13,19 @@ const routeTemplate = Handlebars.compile('/api/event-store/{{eventStore}}/{{name class AllJobsSortBy { private _id: SortingActionsForObservableQuery; - private _name: SortingActionsForObservableQuery; private _details: SortingActionsForObservableQuery; private _type: SortingActionsForObservableQuery; private _status: SortingActionsForObservableQuery; + private _created: SortingActionsForObservableQuery; private _statusChanges: SortingActionsForObservableQuery; private _progress: SortingActionsForObservableQuery; constructor(readonly query: AllJobs) { this._id = new SortingActionsForObservableQuery('id', query); - this._name = new SortingActionsForObservableQuery('name', query); this._details = new SortingActionsForObservableQuery('details', query); this._type = new SortingActionsForObservableQuery('type', query); this._status = new SortingActionsForObservableQuery('status', query); + this._created = new SortingActionsForObservableQuery('created', query); this._statusChanges = new SortingActionsForObservableQuery('statusChanges', query); this._progress = new SortingActionsForObservableQuery('progress', query); } @@ -33,9 +33,6 @@ class AllJobsSortBy { get id(): SortingActionsForObservableQuery { return this._id; } - get name(): SortingActionsForObservableQuery { - return this._name; - } get details(): SortingActionsForObservableQuery { return this._details; } @@ -45,6 +42,9 @@ class AllJobsSortBy { get status(): SortingActionsForObservableQuery { return this._status; } + get created(): SortingActionsForObservableQuery { + return this._created; + } get statusChanges(): SortingActionsForObservableQuery { return this._statusChanges; } @@ -55,19 +55,16 @@ class AllJobsSortBy { class AllJobsSortByWithoutQuery { private _id: SortingActions = new SortingActions('id'); - private _name: SortingActions = new SortingActions('name'); private _details: SortingActions = new SortingActions('details'); private _type: SortingActions = new SortingActions('type'); private _status: SortingActions = new SortingActions('status'); + private _created: SortingActions = new SortingActions('created'); private _statusChanges: SortingActions = new SortingActions('statusChanges'); private _progress: SortingActions = new SortingActions('progress'); get id(): SortingActions { return this._id; } - get name(): SortingActions { - return this._name; - } get details(): SortingActions { return this._details; } @@ -77,6 +74,9 @@ class AllJobsSortByWithoutQuery { get status(): SortingActions { return this._status; } + get created(): SortingActions { + return this._created; + } get statusChanges(): SortingActions { return this._statusChanges; } diff --git a/Source/Workbench/Web/Api/Jobs/Job.ts b/Source/Workbench/Web/Api/Jobs/Job.ts index 7033b94e1..4ca35686d 100644 --- a/Source/Workbench/Web/Api/Jobs/Job.ts +++ b/Source/Workbench/Web/Api/Jobs/Job.ts @@ -9,15 +9,13 @@ import { Guid } from '@cratis/fundamentals'; import { JobProgress } from './JobProgress'; import { JobStatus } from './JobStatus'; import { JobStatusChanged } from './JobStatusChanged'; +import { SerializableDateTimeOffset } from '../Primitives/SerializableDateTimeOffset'; export class Job { @field(Guid) id!: Guid; - @field(String) - name!: string; - @field(String) details!: string; @@ -27,6 +25,9 @@ export class Job { @field(Number) status!: JobStatus; + @field(SerializableDateTimeOffset) + created!: SerializableDateTimeOffset; + @field(JobStatusChanged, true) statusChanges!: JobStatusChanged[]; From 57f4b4e18ccd2b80dcfd7428d72859d1d4755439 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:19:37 +0100 Subject: [PATCH 35/38] Put in a little timeout in specs --- ..._multiple_subscribers_that_subscribes_to_all_event_types.cs | 3 +++ ...tiple_subscribers_that_subscribes_to_one_event_type_each.cs | 3 +++ ..._events_with_different_partitions_with_single_subscriber.cs | 3 +++ .../when_enqueuing/single_event_with_single_subscriber.cs | 3 +++ .../when_enqueuing/with_subscriber_that_has_unsubscribed.cs | 3 +++ .../with_subscriber_that_throws_exception_on_first_handle.cs | 3 +++ 6 files changed, 18 insertions(+) diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_all_event_types.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_all_event_types.cs index eada95806..b89a92fd8 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_all_event_types.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_all_event_types.cs @@ -31,6 +31,9 @@ async Task Because() { await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_call_handle_on_first_observer_twice() => _firstObserverHandledEvents.Count.ShouldEqual(2); diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_one_event_type_each.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_one_event_type_each.cs index bc16c0dd2..98956af2d 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_one_event_type_each.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_multiple_subscribers_that_subscribes_to_one_event_type_each.cs @@ -32,6 +32,9 @@ async Task Because() { await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_call_handle_on_first_observer_once() => _firstObserverHandledEvents.Count.ShouldEqual(1); diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_single_subscriber.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_single_subscriber.cs index 3dae8b14a..d383f4f31 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_single_subscriber.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/multiple_events_with_different_partitions_with_single_subscriber.cs @@ -32,6 +32,9 @@ async Task Because() { await _queue.Enqueue([_firstAppendedEvent, _secondAppendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(2); diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/single_event_with_single_subscriber.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/single_event_with_single_subscriber.cs index 5d72ab0f2..06e19f794 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/single_event_with_single_subscriber.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/single_event_with_single_subscriber.cs @@ -22,6 +22,9 @@ async Task Because() { await _queue.Enqueue([_appendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_call_handle_on_observer_once() => _handledEvents.Count.ShouldEqual(1); diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_has_unsubscribed.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_has_unsubscribed.cs index 74b685b30..ed159a2e6 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_has_unsubscribed.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_has_unsubscribed.cs @@ -23,6 +23,9 @@ async Task Because() { await _queue.Enqueue([_appendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_not_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(0); diff --git a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_throws_exception_on_first_handle.cs b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_throws_exception_on_first_handle.cs index cf66a7d97..8e2094f71 100644 --- a/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_throws_exception_on_first_handle.cs +++ b/Source/Kernel/Grains.Specs/EventSequences/for_AppendedEventsQueue/when_enqueuing/with_subscriber_that_throws_exception_on_first_handle.cs @@ -36,6 +36,9 @@ async Task Because() { await _queue.Enqueue([_appendedEvent]); await _queue.AwaitQueueDepletion(); + + // waiting for queue depletion does not guarantee that the event was actually handled + await Task.Delay(100); } [Fact] void should_call_handle_on_observer_twice() => _handledEvents.Count.ShouldEqual(2); From ec5126719031fc9e1415faceda6db341cf9ec7c9 Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 13:48:06 +0100 Subject: [PATCH 36/38] Make auto registration of bson serializers more explicit --- ...SerializerDisableAutoRegistrationAttribute.cs | 10 ++++++++++ .../CustomSerializersRegistrationService.cs | 16 ++++++++-------- .../Encryption/EncryptionKeySerializer.cs | 2 ++ 3 files changed, 20 insertions(+), 8 deletions(-) create mode 100644 Source/Kernel/Storage.MongoDB/BsonSerializerDisableAutoRegistrationAttribute.cs diff --git a/Source/Kernel/Storage.MongoDB/BsonSerializerDisableAutoRegistrationAttribute.cs b/Source/Kernel/Storage.MongoDB/BsonSerializerDisableAutoRegistrationAttribute.cs new file mode 100644 index 000000000..cc94931a6 --- /dev/null +++ b/Source/Kernel/Storage.MongoDB/BsonSerializerDisableAutoRegistrationAttribute.cs @@ -0,0 +1,10 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Storage.MongoDB; + +/// +/// Disables auto registration of bson serializer. +/// +[AttributeUsage(AttributeTargets.Class)] +public sealed class BsonSerializerDisableAutoRegistrationAttribute : Attribute; diff --git a/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs index cd02deee6..f4eb9332a 100644 --- a/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs +++ b/Source/Kernel/Storage.MongoDB/CustomSerializersRegistrationService.cs @@ -1,7 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using Cratis.Compliance.MongoDB; +using Cratis.Reflection; using Cratis.Types; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -18,20 +18,20 @@ public class CustomSerializersRegistrationService(IServiceProvider serviceProvid /// public Task Execute(CancellationToken cancellationToken) { - foreach (var type in types.FindMultiple() - .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType) - .Except([typeof(EncryptionKeySerializer)])) + foreach (var type in types.FindMultiple().Where(IsEligibleForAutoRegistration)) { var provider = (IBsonSerializationProvider)ActivatorUtilities.CreateInstance(serviceProvider, type); BsonSerializer.RegisterSerializationProvider(provider); } - foreach (var type in types.FindMultiple() - .Where(type => type.Assembly.FullName!.Contains("Chronicle") && !type.IsGenericType) - .Except([typeof(EncryptionKeySerializer)])) + foreach (var type in types.FindMultiple().Where(IsEligibleForAutoRegistration)) { var serializer = (IBsonSerializer)ActivatorUtilities.CreateInstance(serviceProvider, type); BsonSerializer.TryRegisterSerializer(serializer.ValueType, serializer); } return Task.CompletedTask; } -} + + static bool IsEligibleForAutoRegistration(Type type) => type.Assembly.FullName!.Contains("Cratis.Chronicle") && + !type.IsGenericType && + !type.HasAttribute(); +} \ No newline at end of file diff --git a/Source/Kernel/Storage.MongoDB/Encryption/EncryptionKeySerializer.cs b/Source/Kernel/Storage.MongoDB/Encryption/EncryptionKeySerializer.cs index d109ae8f1..582ea5af8 100644 --- a/Source/Kernel/Storage.MongoDB/Encryption/EncryptionKeySerializer.cs +++ b/Source/Kernel/Storage.MongoDB/Encryption/EncryptionKeySerializer.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using Cratis.Chronicle.Storage.Compliance; +using Cratis.Chronicle.Storage.MongoDB; using MongoDB.Bson.Serialization; namespace Cratis.Compliance.MongoDB; @@ -9,6 +10,7 @@ namespace Cratis.Compliance.MongoDB; /// /// Represents a for handling serialization of . /// +[BsonSerializerDisableAutoRegistration] public class EncryptionKeySerializer : IBsonSerializer { /// From cb57a18d61a82ea3db9fc761b9ea0790fc605b8f Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 14:24:04 +0100 Subject: [PATCH 37/38] Fix build issues --- .../Jobs/for_JobsManager/given/the_manager.cs | 3 ++- .../Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs | 3 +++ .../Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs | 3 +++ Source/Kernel/Grains/Jobs/JobTypes.cs | 10 +++++----- .../SerializationConfigurationExtensions.cs | 1 - 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs index c6de8c1ca..fde845eed 100644 --- a/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs +++ b/Source/Kernel/Grains.Specs/Jobs/for_JobsManager/given/the_manager.cs @@ -64,7 +64,8 @@ protected Mock AddJob(JobId id) var state = new JobState { Type = typeof(TJob), - Id = id + Id = id, + Created = DateTimeOffset.UtcNow }; _storedJobs.Add(state); _jobTypes.GetClrTypeFor(state.Type).Returns(Result.Success(typeof(TJob))); diff --git a/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs b/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs index e042aa5e9..dcd549212 100644 --- a/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs +++ b/Source/Kernel/Grains/Jobs/JobTypeCanOnlyHaveOneRequestType.cs @@ -1,3 +1,6 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + using Cratis.Chronicle.Concepts.Jobs; namespace Cratis.Chronicle.Grains.Jobs; diff --git a/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs b/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs index 3453bb7f3..513c1eca2 100644 --- a/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs +++ b/Source/Kernel/Grains/Jobs/JobTypeMustHaveARequestType.cs @@ -1,3 +1,6 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + using Cratis.Chronicle.Concepts.Jobs; namespace Cratis.Chronicle.Grains.Jobs; diff --git a/Source/Kernel/Grains/Jobs/JobTypes.cs b/Source/Kernel/Grains/Jobs/JobTypes.cs index 6b2c1298e..fb35b0636 100644 --- a/Source/Kernel/Grains/Jobs/JobTypes.cs +++ b/Source/Kernel/Grains/Jobs/JobTypes.cs @@ -14,11 +14,6 @@ namespace Cratis.Chronicle.Grains.Jobs; [Singleton] public class JobTypes : IJobTypes { - /// - /// Gets the singleton instance of . - /// - public static IJobTypes Instance { get; private set; } - readonly Dictionary _jobTypes = []; readonly Dictionary _jobTypePerType = []; readonly Dictionary _jobRequestTypes = []; @@ -33,6 +28,11 @@ public JobTypes(ITypes types) Instance = this; } + /// + /// Gets the singleton instance of . + /// + public static IJobTypes Instance { get; private set; } = null!; + /// public Result GetFor(Type type) => _jobTypePerType.TryGetValue(type, out var jobType) diff --git a/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs b/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs index 4d198c43e..5b224c211 100644 --- a/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs +++ b/Source/Kernel/Setup/Serialization/SerializationConfigurationExtensions.cs @@ -8,7 +8,6 @@ using Cratis.Chronicle.Concepts.Projections.Json; using Cratis.Chronicle.Grains.Observation; using Cratis.Chronicle.Properties; -using Cratis.Chronicle.Storage.Jobs; using Cratis.Json; using Microsoft.Extensions.DependencyInjection; using Orleans.Serialization; From c671c4cd331b297e17e270aac7e16a4d42ce14ab Mon Sep 17 00:00:00 2001 From: woksin Date: Mon, 10 Feb 2025 14:31:24 +0100 Subject: [PATCH 38/38] When starting Job set the Created timestamp --- Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs | 2 +- Source/Kernel/Grains/Jobs/Job.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs b/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs index 4df545f2b..da89e1f35 100644 --- a/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs +++ b/Source/Kernel/Grains.Interfaces/Jobs/IJobsManager.cs @@ -19,7 +19,7 @@ public interface IJobsManager : IGrainWithIntegerCompoundKey Task Rehydrate(); /// - /// Start a job. + /// Start a new job. /// /// The uniquely identifying the job. /// The request parameter being passed to the job. diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index e77d475f6..c7b520e73 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -102,7 +102,6 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) Storage = ServiceProvider.GetRequiredService() .GetEventStore(JobKey.EventStore) .GetNamespace(JobKey.Namespace); - return Task.CompletedTask; } @@ -114,6 +113,7 @@ public async Task> Start(TRequest request) { _logger.Starting(); _isRunning = true; + State.Created = DateTimeOffset.UtcNow; State.Request = request!; State.Details = GetJobDetails(); await WriteStatusChanged(JobStatus.PreparingSteps);