From 6f2c4f8d78e7193480fb548136f9f31da8bd8733 Mon Sep 17 00:00:00 2001 From: krivchenko_kv Date: Sat, 28 Sep 2024 00:54:44 +0300 Subject: [PATCH] #311 IOutboxRepository with key type Signed-off-by: krivchenko_kv --- .../GlobalSuppressions.cs | 8 ++++ .../MessageBusBuilderExtensions.cs | 4 +- .../ISqlOutboxRepository.cs | 2 +- .../SqlOutboxRepository.cs | 17 +++++--- .../MessageBusBuilderExtensions.cs | 13 +++--- .../GlobalSuppressions.cs | 8 ++++ .../IEnumerableExtensions.cs | 26 +++++++----- .../OutboxForwardingPublishInterceptor.cs | 13 ++++-- .../Repositories/IOutboxRepository.cs | 13 +++--- .../Repositories/OutboxMessage.cs | 5 ++- .../Services/OutboxLockRenewalTimer.cs | 8 ++-- .../Services/OutboxLockRenewalTimerFactory.cs | 12 ++---- .../Services/OutboxSendingTask.cs | 18 ++++---- .../OutboxBenchmarkTests.cs | 6 +-- .../BaseSqlOutboxRepositoryTest.cs | 8 ++-- .../SqlOutboxRepositoryTests.cs | 4 +- .../GlobalSuppressions.cs | 8 ++++ ...OutboxForwardingPublishInterceptorTests.cs | 42 +++++++++---------- .../OutboxLockRenewalTimerTests.cs | 23 ++++++---- .../Services/OutboxSendingTaskTests.cs | 27 ++++++------ 20 files changed, 154 insertions(+), 111 deletions(-) create mode 100644 src/SlimMessageBus.Host.Outbox.DbContext/GlobalSuppressions.cs create mode 100644 src/SlimMessageBus.Host.Outbox/GlobalSuppressions.cs create mode 100644 src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalSuppressions.cs diff --git a/src/SlimMessageBus.Host.Outbox.DbContext/GlobalSuppressions.cs b/src/SlimMessageBus.Host.Outbox.DbContext/GlobalSuppressions.cs new file mode 100644 index 00000000..ca3e584e --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.DbContext/GlobalSuppressions.cs @@ -0,0 +1,8 @@ +// This file is used by Code Analysis to maintain SuppressMessage +// attributes that are applied to this project. +// Project-level suppressions either have no target or are given +// a specific target and scoped to a namespace, type, member, etc. + +using System.Diagnostics.CodeAnalysis; + +[assembly: SuppressMessage("Minor Code Smell", "S6672:Generic logger injection should match enclosing type", Justification = "", Scope = "member", Target = "~M:SlimMessageBus.Host.Outbox.DbContext.DbContextOutboxRepository`1.#ctor(Microsoft.Extensions.Logging.ILogger{SlimMessageBus.Host.Outbox.Sql.SqlOutboxRepository},SlimMessageBus.Host.Outbox.Sql.SqlOutboxSettings,SlimMessageBus.Host.Outbox.Sql.SqlOutboxTemplate,`0,SlimMessageBus.Host.Sql.Common.ISqlTransactionService)")] diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs index 19676168..b5e3818c 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs @@ -5,7 +5,7 @@ public static class MessageBusBuilderExtensions public static MessageBusBuilder AddOutboxUsingSql(this MessageBusBuilder mbb, Action configure) where TOutboxRepository : class, ISqlOutboxRepository { - mbb.AddOutbox(); + mbb.AddOutbox(); mbb.PostConfigurationActions.Add(services => { @@ -37,7 +37,7 @@ public static MessageBusBuilder AddOutboxUsingSql(this Messag services.TryAddScoped(); services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); - services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); + services.Replace(ServiceDescriptor.Scoped>(svp => svp.GetRequiredService())); services.TryAddSingleton(); services.TryAddTransient(); diff --git a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs index cb124ccb..bd588289 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Outbox.Sql; -public interface ISqlOutboxRepository : IOutboxRepository +public interface ISqlOutboxRepository : IOutboxRepository { } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs index 6501bb99..eea89cb9 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs @@ -17,7 +17,12 @@ public SqlOutboxRepository(ILogger logger, SqlOutboxSetting Settings = settings; } - public async virtual Task Save(OutboxMessage message, CancellationToken token) + public Task GenerateId(CancellationToken cancellationToken) + { + return Task.FromResult(Guid.NewGuid()); + } + + public async virtual Task Save(OutboxMessage message, CancellationToken token) { await EnsureConnection(); @@ -39,7 +44,7 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo }, token); } - public async Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token) + public async Task>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token) { await EnsureConnection(); @@ -154,7 +159,7 @@ public async Task RenewLock(string instanceId, TimeSpan lockDuration, Canc return await cmd.ExecuteNonQueryAsync(token) > 0; } - internal async Task> GetAllMessages(CancellationToken cancellationToken) + internal async Task>> GetAllMessages(CancellationToken cancellationToken) { await EnsureConnection(); @@ -164,7 +169,7 @@ internal async Task> GetAllMessages(Cancellat return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false); } - private async Task> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken) + private async Task>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken) { using var reader = await cmd.ExecuteReaderAsync(cancellationToken); @@ -182,12 +187,12 @@ private async Task> ReadMessages(SqlCommand c var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete"); var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted"); - var items = new List(); + var items = new List>(); while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) { var id = reader.GetGuid(idOrdinal); var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal); - var message = new OutboxMessage + var message = new OutboxMessage { Id = id, Timestamp = reader.GetDateTime(timestampOrdinal), diff --git a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs index 0c9368db..ebe173fb 100644 --- a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs @@ -4,7 +4,7 @@ public static class MessageBusBuilderExtensions { - public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action configure = null) + public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action configure = null) { mbb.PostConfigurationActions.Add(services => { @@ -17,7 +17,7 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action x.MessageType)) { var serviceType = typeof(IPublishInterceptor<>).MakeGenericType(producerMessageType); - var implementationType = typeof(OutboxForwardingPublishInterceptor<>).MakeGenericType(producerMessageType); + var implementationType = typeof(OutboxForwardingPublishInterceptor<,>).MakeGenericType(producerMessageType); services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType)); } @@ -33,12 +33,12 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action(); - services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); - services.TryAdd(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + services.AddSingleton>(); + services.TryAddEnumerable(ServiceDescriptor.Singleton>(sp => sp.GetRequiredService>())); + services.TryAdd(ServiceDescriptor.Singleton>(sp => sp.GetRequiredService>())); services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton>(); services.TryAddSingleton(svp => { @@ -47,6 +47,7 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action> Batch(this IEnumerable items, int batchSize) { - if (items == null) - { - throw new ArgumentNullException(nameof(items)); - } - - if (batchSize <= 0) - { - throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be greater than zero."); - } + // Parameter validation in yielding methods should be wrapped + Check(items, batchSize); using var enumerator = items.GetEnumerator(); while (enumerator.MoveNext()) @@ -26,4 +19,17 @@ public static IEnumerable> Batch(this IEnumerable i yield return batch.AsReadOnly(); } } + + private static void Check(IEnumerable items, int batchSize) + { + if (items == null) + { + throw new ArgumentNullException(nameof(items)); + } + + if (batchSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be greater than zero."); + } + } } diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs index 40921b8d..928245b3 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs @@ -12,9 +12,9 @@ public abstract class OutboxForwardingPublishInterceptor /// Interceptor must be registered as a transient in order for outbox notifications to be raised. /// Notifications are raised on disposal (if required), to ensure they occur outside of a transaction scope. /// -public sealed class OutboxForwardingPublishInterceptor( +public sealed class OutboxForwardingPublishInterceptor( ILogger logger, - IOutboxRepository outboxRepository, + IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, IOutboxNotificationService outboxNotificationService, OutboxSettings outboxSettings) @@ -23,7 +23,7 @@ public sealed class OutboxForwardingPublishInterceptor( internal const string SkipOutboxHeader = "__SkipOutbox"; private readonly ILogger _logger = logger; - private readonly IOutboxRepository _outboxRepository = outboxRepository; + private readonly IOutboxRepository _outboxRepository = outboxRepository; private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider; private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService; private readonly OutboxSettings _outboxSettings = outboxSettings; @@ -67,9 +67,13 @@ public async Task OnHandle(T message, Func next, IProducerContext context) var messagePayload = busMaster.Serializer?.Serialize(messageType, message) ?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin"); + // Generates a unique identifier in the current database + var outboxId = await _outboxRepository.GenerateId(context.CancellationToken); + // Add message to the database, do not call next() - var outboxMessage = new OutboxMessage + var outboxMessage = new OutboxMessage { + Id = outboxId, BusName = busMaster.Name, Headers = context.Headers, Path = context.Path, @@ -77,6 +81,7 @@ public async Task OnHandle(T message, Func next, IProducerContext context) MessagePayload = messagePayload, InstanceId = _instanceIdProvider.GetInstanceId() }; + await _outboxRepository.Save(outboxMessage, context.CancellationToken); // a message was sent, notify outbox service to poll on dispose (post transaction) diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs index a8772916..7ce480a8 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs @@ -1,12 +1,13 @@ namespace SlimMessageBus.Host.Outbox; -public interface IOutboxRepository +public interface IOutboxRepository { - Task Save(OutboxMessage message, CancellationToken token); - Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token); - Task AbortDelivery (IReadOnlyCollection ids, CancellationToken token); - Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token); - Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token); + Task GenerateId(CancellationToken cancellationToken); + Task Save(OutboxMessage message, CancellationToken token); + Task>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token); + Task AbortDelivery(IReadOnlyCollection ids, CancellationToken token); + Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token); + Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token); Task DeleteSent(DateTime olderThan, CancellationToken token); Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token); } diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs index 01aedcdb..9578d097 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs @@ -1,8 +1,9 @@ namespace SlimMessageBus.Host.Outbox; -public class OutboxMessage + +public class OutboxMessage { - public Guid Id { get; set; } = Guid.NewGuid(); + public TOutboxKey Id { get; set; } public DateTime Timestamp { get; set; } = DateTime.UtcNow; public string BusName { get; set; } public string MessageType { get; set; } diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs index 16ccae19..f99dd9a1 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimer.cs @@ -1,18 +1,18 @@ namespace SlimMessageBus.Host.Outbox.Services; using SlimMessageBus.Host.Outbox; -public sealed class OutboxLockRenewalTimer : IOutboxLockRenewalTimer +public sealed class OutboxLockRenewalTimer : IOutboxLockRenewalTimer { private readonly object _lock; private readonly Timer _timer; - private readonly ILogger _logger; - private readonly IOutboxRepository _outboxRepository; + private readonly ILogger> _logger; + private readonly IOutboxRepository _outboxRepository; private readonly CancellationToken _cancellationToken; private readonly Action _lockLost; private bool _active; private bool _renewingLock; - public OutboxLockRenewalTimer(ILogger logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action lockLost, CancellationToken cancellationToken) + public OutboxLockRenewalTimer(ILogger> logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action lockLost, CancellationToken cancellationToken) { Debug.Assert(lockRenewalInterval < lockDuration); diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimerFactory.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimerFactory.cs index 84df1ccb..2fafa4f9 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimerFactory.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxLockRenewalTimerFactory.cs @@ -1,18 +1,14 @@ namespace SlimMessageBus.Host.Outbox.Services; -public class OutboxLockRenewalTimerFactory : IOutboxLockRenewalTimerFactory, IAsyncDisposable + +public class OutboxLockRenewalTimerFactory(IServiceProvider serviceProvider) : IOutboxLockRenewalTimerFactory, IAsyncDisposable { - private readonly IServiceScope _scope; + private readonly IServiceScope _scope = serviceProvider.CreateScope(); private bool _isDisposed = false; - public OutboxLockRenewalTimerFactory(IServiceProvider serviceProvider) - { - _scope = serviceProvider.CreateScope(); - } - public IOutboxLockRenewalTimer CreateRenewalTimer(TimeSpan lockDuration, TimeSpan interval, Action lockLost, CancellationToken cancellationToken) { - return (OutboxLockRenewalTimer)ActivatorUtilities.CreateInstance(_scope.ServiceProvider, typeof(OutboxLockRenewalTimer), lockDuration, interval, lockLost, cancellationToken); + return (OutboxLockRenewalTimer)ActivatorUtilities.CreateInstance(_scope.ServiceProvider, typeof(OutboxLockRenewalTimer), lockDuration, interval, lockLost, cancellationToken); } public async ValueTask DisposeAsync() diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index c0b32c62..055f9b98 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -4,13 +4,13 @@ using SlimMessageBus.Host; using SlimMessageBus.Host.Outbox; -internal class OutboxSendingTask( +internal class OutboxSendingTask( ILoggerFactory loggerFactory, OutboxSettings outboxSettings, IServiceProvider serviceProvider) : IMessageBusLifecycleInterceptor, IOutboxNotificationService, IAsyncDisposable { - private readonly ILogger _logger = loggerFactory.CreateLogger(); + private readonly ILogger> _logger = loggerFactory.CreateLogger>(); private readonly OutboxSettings _outboxSettings = outboxSettings; private readonly IServiceProvider _serviceProvider = serviceProvider; @@ -157,7 +157,7 @@ private async Task Run() try { await EnsureMigrateSchema(scope.ServiceProvider, _loopCts.Token); - var outboxRepository = scope.ServiceProvider.GetRequiredService(); + var outboxRepository = scope.ServiceProvider.GetRequiredService>(); do { if (_loopCts.Token.IsCancellationRequested) @@ -205,7 +205,7 @@ private async Task Run() } } - async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) + async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) { var lockDuration = TimeSpan.FromSeconds(Math.Min(Math.Max(_outboxSettings.LockExpiration.TotalSeconds, 5), 30)); if (lockDuration != _outboxSettings.LockExpiration) @@ -249,14 +249,14 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR return count; } - async internal Task<(bool RunAgain, int Count)> ProcessMessages(IOutboxRepository outboxRepository, IReadOnlyCollection outboxMessages, ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, CancellationToken cancellationToken) + async internal Task<(bool RunAgain, int Count)> ProcessMessages(IOutboxRepository outboxRepository, IReadOnlyCollection> outboxMessages, ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, CancellationToken cancellationToken) { const int defaultBatchSize = 50; var runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize; var count = 0; - var abortedIds = new List(_outboxSettings.PollBatchSize); + var abortedIds = new List(_outboxSettings.PollBatchSize); foreach (var busGroup in outboxMessages.GroupBy(x => x.BusName)) { var busName = busGroup.Key; @@ -323,7 +323,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR return (runAgain, count); } - async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) + async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) { _logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName); @@ -369,9 +369,9 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus public record OutboxBulkMessage : BulkMessageEnvelope { - public Guid Id { get; } + public TOutboxKey Id { get; } - public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary headers) + public OutboxBulkMessage(TOutboxKey id, object message, Type messageType, IDictionary headers) : base(message, messageType, headers) { Id = id; diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs index b8e291c1..80d552eb 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs @@ -151,10 +151,10 @@ await PerformDbOperation(async (context, outboxMigrationService) => var events = Enumerable.Range(0, messageCount).Select(x => new CustomerCreatedEvent(Guid.NewGuid(), $"John {x:000}", surnames[x % surnames.Length])).ToList(); var store = ServiceProvider!.GetRequiredService>(); - OutboxSendingTask outboxSendingTask = null; + OutboxSendingTask outboxSendingTask = null; if (_useOutbox) { - outboxSendingTask = ServiceProvider.GetRequiredService(); + outboxSendingTask = ServiceProvider.GetRequiredService>(); // migrate data context await outboxSendingTask.OnBusLifecycle(Interceptor.MessageBusLifecycleEventType.Created, null); @@ -197,7 +197,7 @@ await PerformDbOperation(async (context, outboxMigrationService) => var outboxPublishTimerElapsed = TimeSpan.Zero; if (_useOutbox) { - var outputRepository = ServiceProvider.GetRequiredService(); + var outputRepository = ServiceProvider.GetRequiredService>(); var outboxTimer = Stopwatch.StartNew(); var publishCount = await outboxSendingTask.SendMessages(ServiceProvider, outputRepository, CancellationToken.None); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs index cb51dc21..82769b77 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs @@ -31,7 +31,7 @@ public override Task DisposeAsync() return base.DisposeAsync(); } - protected async Task> SeedOutbox(int count, Action action = null, CancellationToken cancellationToken = default) + protected async Task>> SeedOutbox(int count, Action> action = null, CancellationToken cancellationToken = default) { var messages = CreateOutboxMessages(count); for (var i = 0; i < messages.Count; i++) @@ -44,7 +44,7 @@ protected async Task> SeedOutbox(int count, Action< return messages; } - protected IReadOnlyList CreateOutboxMessages(int count) + protected IReadOnlyList> CreateOutboxMessages(int count) { return Enumerable .Range(0, count) @@ -63,7 +63,7 @@ protected IReadOnlyList CreateOutboxMessages(int count) }; // Configure fixture to use the generated values - _fixture.Customize(om => om + _fixture.Customize>(om => om .With(x => x.MessagePayload, jsonPayload) .With(x => x.Headers, headers) .With(x => x.LockExpiresOn, DateTime.MinValue) @@ -72,7 +72,7 @@ protected IReadOnlyList CreateOutboxMessages(int count) .With(x => x.DeliveryAttempt, 0) .With(x => x.DeliveryComplete, false)); - return _fixture.Create(); + return _fixture.Create>(); }) .ToList(); } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs index 381e1199..ae5ebeb1 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs @@ -45,10 +45,10 @@ public class DeleteSentTests : BaseSqlOutboxRepositoryTest public async Task ExpiredItems_AreDeleted() { // arrange - var active = new DateTime(2000, 1, 1); + var active = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); var expired = active.AddDays(-1); - var seedMessages = await SeedOutbox(10, (i, x) => + await SeedOutbox(10, (i, x) => { x.DeliveryAttempt = 1; x.DeliveryComplete = true; diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalSuppressions.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalSuppressions.cs new file mode 100644 index 00000000..0a3e27ec --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalSuppressions.cs @@ -0,0 +1,8 @@ +// This file is used by Code Analysis to maintain SuppressMessage +// attributes that are applied to this project. +// Project-level suppressions either have no target or are given +// a specific target and scoped to a namespace, type, member, etc. + +using System.Diagnostics.CodeAnalysis; + +[assembly: SuppressMessage("Minor Code Smell", "S6608:Prefer indexing instead of \"Enumerable\" methods on types implementing \"IList\"")] diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs index b13f9dd8..cb7d95f4 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs @@ -14,13 +14,13 @@ public void OutboxForwardingPublisher_MustBeLastInPipeline() var expected = int.MaxValue; var mockLogger = new Mock>(); - var mockOutboxRepository = new Mock(); + var mockOutboxRepository = new Mock>(); var mockInstanceIdProvider = new Mock(); var mockOutboxNotificationService = new Mock(); var mockOutboxSettings = new Mock(); // act - var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxRepository.Object, mockInstanceIdProvider.Object, mockOutboxNotificationService.Object, mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxRepository.Object, mockInstanceIdProvider.Object, mockOutboxNotificationService.Object, mockOutboxSettings.Object); var actual = target.Order; // assert @@ -31,7 +31,7 @@ public void OutboxForwardingPublisher_MustBeLastInPipeline() public void OutboxForwardingPublisher_MustImplement_IInterceptorWithOrder() { // act - var actual = typeof(OutboxForwardingPublishInterceptor).IsAssignableTo(typeof(IInterceptorWithOrder)); + var actual = typeof(OutboxForwardingPublishInterceptor).IsAssignableTo(typeof(IInterceptorWithOrder)); // assert actual.Should().BeTrue(); @@ -41,19 +41,19 @@ public void OutboxForwardingPublisher_MustImplement_IInterceptorWithOrder() public class OnHandleTests { private readonly Mock> _mockLogger; - private readonly Mock _mockOutboxRepository; + private readonly Mock> _mockOutboxRepository; private readonly Mock _mockInstanceIdProvider; private readonly Mock _mockSerializer; private readonly Mock _mockMasterMessageBus; private readonly Mock _mockOutboxNotificationService; private readonly Mock _mockOutboxSettings; - private Mock _mockTargetBus; - private Mock _mockProducerContext; + private readonly Mock _mockTargetBus; + private readonly Mock _mockProducerContext; public OnHandleTests() { _mockLogger = new Mock>(); - _mockOutboxRepository = new Mock(); + _mockOutboxRepository = new Mock>(); _mockInstanceIdProvider = new Mock(); _mockOutboxNotificationService = new Mock(); _mockOutboxSettings = new Mock(); @@ -74,8 +74,8 @@ public OnHandleTests() public async Task SkipOutboxHeader_IsPresent_PromoteToNext() { // arrange - _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); + _mockOutboxRepository.Setup(x => x.Save(It.IsAny>(), It.IsAny())).Verifiable(); var nextCalled = 0; var next = () => @@ -85,14 +85,14 @@ public async Task SkipOutboxHeader_IsPresent_PromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); // assert _mockProducerContext.VerifyGet(x => x.Bus, Times.AtLeastOnce); _mockProducerContext.VerifyGet(x => x.Headers, Times.AtLeastOnce); - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Never); + _mockOutboxRepository.Verify(x => x.Save(It.IsAny>(), It.IsAny()), Times.Never); nextCalled.Should().Be(1); } @@ -104,7 +104,7 @@ public async Task SkipOutboxHeader_IsNotPresent_DoNotPromoteToNext() var message = new object(); _mockSerializer.Setup(x => x.Serialize(typeof(object), message)).Verifiable(); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockOutboxRepository.Setup(x => x.Save(It.IsAny>(), It.IsAny())).Verifiable(); var nextCalled = 0; var next = () => @@ -114,30 +114,30 @@ public async Task SkipOutboxHeader_IsNotPresent_DoNotPromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); // assert nextCalled.Should().Be(0); - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.Save(It.IsAny>(), It.IsAny()), Times.Once); } [Fact] public async Task SkipOutboxHeader_IsPresent_DoNotRaiseOutboxNotification() { // arrange - _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockProducerContext.SetupGet(x => x.Headers).Returns(new Dictionary { { OutboxForwardingPublishInterceptor.SkipOutboxHeader, true } }); + _mockOutboxRepository.Setup(x => x.Save(It.IsAny>(), It.IsAny())).Verifiable(); _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); // assert - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Never); + _mockOutboxRepository.Verify(x => x.Save(It.IsAny>(), It.IsAny()), Times.Never); _mockOutboxNotificationService.Verify(x => x.Notify(), Times.Never); } @@ -148,16 +148,16 @@ public async Task SkipOutboxHeader_IsNotPresent_RaiseOutboxNotification() var message = new object(); _mockSerializer.Setup(x => x.Serialize(typeof(object), message)).Verifiable(); - _mockOutboxRepository.Setup(x => x.Save(It.IsAny(), It.IsAny())).Verifiable(); + _mockOutboxRepository.Setup(x => x.Save(It.IsAny>(), It.IsAny())).Verifiable(); _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); // assert - _mockOutboxRepository.Verify(x => x.Save(It.IsAny(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.Save(It.IsAny>(), It.IsAny()), Times.Once); _mockOutboxNotificationService.Verify(x => x.Notify(), Times.Once); } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs index ffc9d15d..912ea1b9 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/OutboxLockRenewalTimerTests.cs @@ -1,9 +1,9 @@ namespace SlimMessageBus.Host.Outbox.Test; -public class OutboxLockRenewalTimerTests +public sealed class OutboxLockRenewalTimerTests: IDisposable { - private readonly Mock> _loggerMock; - private readonly Mock _outboxRepositoryMock; + private readonly Mock>> _loggerMock; + private readonly Mock> _outboxRepositoryMock; private readonly Mock _instanceIdProviderMock; private readonly CancellationTokenSource _cancellationTokenSource; private readonly Action _lockLostAction; @@ -13,8 +13,8 @@ public class OutboxLockRenewalTimerTests public OutboxLockRenewalTimerTests() { - _loggerMock = new Mock>(); - _outboxRepositoryMock = new Mock(); + _loggerMock = new Mock>>(); + _outboxRepositoryMock = new Mock>(); _instanceIdProviderMock = new Mock(); _cancellationTokenSource = new CancellationTokenSource(); _lockLostAction = Mock.Of>(); @@ -111,9 +111,9 @@ public async Task CallbackAsync_ShouldReturnGracefullyIfTokenCancelled() lockLostActionMock.Verify(a => a(It.IsAny()), Times.Never); } - private OutboxLockRenewalTimer CreateTimer(Action lockLostAction = null) + private OutboxLockRenewalTimer CreateTimer(Action lockLostAction = null) { - return new OutboxLockRenewalTimer( + return new OutboxLockRenewalTimer( _loggerMock.Object, _outboxRepositoryMock.Object, _instanceIdProviderMock.Object, @@ -123,9 +123,14 @@ private OutboxLockRenewalTimer CreateTimer(Action lockLostAction = nu _cancellationTokenSource.Token); } - private static async Task InvokeCallbackAsync(OutboxLockRenewalTimer timer) + private static async Task InvokeCallbackAsync(OutboxLockRenewalTimer timer) { - var callbackMethod = typeof(OutboxLockRenewalTimer).GetMethod("CallbackAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var callbackMethod = typeof(OutboxLockRenewalTimer).GetMethod("CallbackAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); await (Task)callbackMethod.Invoke(timer, null); } + + public void Dispose() + { + _cancellationTokenSource.Dispose(); + } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs index aa2a71e2..d3e4fe21 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs @@ -1,29 +1,29 @@ namespace SlimMessageBus.Host.Outbox.Test.Services; -using static SlimMessageBus.Host.Outbox.Services.OutboxSendingTask; +using static SlimMessageBus.Host.Outbox.Services.OutboxSendingTask; public sealed class OutboxSendingTaskTests { public class DispatchBatchTests { private readonly ILoggerFactory _loggerFactory; - private readonly Mock _outboxRepositoryMock; + private readonly Mock> _outboxRepositoryMock; private readonly Mock _producerMock; private readonly Mock _messageBusTargetMock; private readonly OutboxSettings _outboxSettings; private readonly IServiceProvider _serviceProvider; - private readonly OutboxSendingTask _sut; + private readonly OutboxSendingTask _sut; public DispatchBatchTests() { - _outboxRepositoryMock = new Mock(); + _outboxRepositoryMock = new Mock>(); _producerMock = new Mock(); _messageBusTargetMock = new Mock(); _outboxSettings = new OutboxSettings { MaxDeliveryAttempts = 5 }; _serviceProvider = Mock.Of(); _loggerFactory = new NullLoggerFactory(); - _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, _serviceProvider); + _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, _serviceProvider); } [Fact] @@ -88,23 +88,22 @@ public async Task DispatchBatch_ShouldIncrementDeliveryAttempts_WhenNotAllMessag public class ProcessMessagesTests { - private readonly Mock _mockOutboxRepository; + private readonly Mock> _mockOutboxRepository; private readonly Mock _mockCompositeMessageBus; private readonly Mock _mockMessageBusTarget; private readonly Mock _mockMasterMessageBus; private readonly Mock _mockMessageBusBulkProducer; - private readonly Mock> _mockLogger; + private readonly OutboxSettings _outboxSettings; - private readonly OutboxSendingTask _sut; + private readonly OutboxSendingTask _sut; public ProcessMessagesTests() { - _mockOutboxRepository = new Mock(); + _mockOutboxRepository = new Mock>(); _mockCompositeMessageBus = new Mock(); _mockMessageBusTarget = new Mock(); _mockMasterMessageBus = new Mock(); _mockMessageBusBulkProducer = _mockMasterMessageBus.As(); - _mockLogger = new Mock>(); _outboxSettings = new OutboxSettings { @@ -112,7 +111,7 @@ public ProcessMessagesTests() MessageTypeResolver = new Mock().Object }; - _sut = new OutboxSendingTask(NullLoggerFactory.Instance, _outboxSettings, null); + _sut = new OutboxSendingTask(NullLoggerFactory.Instance, _outboxSettings, null); } [Fact] @@ -179,7 +178,7 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised() outboxMessages[7].BusName = null; var knownBusCount = outboxMessages.Count(x => x.BusName != null); - + _mockMessageBusTarget.SetupGet(x => x.Target).Returns((IMessageBusProducer)null); _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); @@ -237,12 +236,12 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenMessageTypeIsNotRecogn result.Count.Should().Be(knownMessageCount); } - private static List CreateOutboxMessages(int count) + private static List> CreateOutboxMessages(int count) { return Enumerable .Range(0, count) .Select( - _ => new OutboxMessage + _ => new OutboxMessage { Id = Guid.NewGuid(), MessageType = "TestType",