Skip to content

Commit

Permalink
zarusz#311 IOutboxRepository<T> with key type
Browse files Browse the repository at this point in the history
Signed-off-by: krivchenko_kv <[email protected]>
  • Loading branch information
krivchenko_kv committed Sep 27, 2024
1 parent 698a9a2 commit 6f2c4f8
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

using System.Diagnostics.CodeAnalysis;

[assembly: SuppressMessage("Minor Code Smell", "S6672:Generic logger injection should match enclosing type", Justification = "<Pending>", Scope = "member", Target = "~M:SlimMessageBus.Host.Outbox.DbContext.DbContextOutboxRepository`1.#ctor(Microsoft.Extensions.Logging.ILogger{SlimMessageBus.Host.Outbox.Sql.SqlOutboxRepository},SlimMessageBus.Host.Outbox.Sql.SqlOutboxSettings,SlimMessageBus.Host.Outbox.Sql.SqlOutboxTemplate,`0,SlimMessageBus.Host.Sql.Common.ISqlTransactionService)")]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public static class MessageBusBuilderExtensions
public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this MessageBusBuilder mbb, Action<SqlOutboxSettings> configure)
where TOutboxRepository : class, ISqlOutboxRepository
{
mbb.AddOutbox();
mbb.AddOutbox<Guid>();

mbb.PostConfigurationActions.Add(services =>
{
Expand Down Expand Up @@ -37,7 +37,7 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.TryAddScoped<ISqlTransactionService, SqlTransactionService>();

services.Replace(ServiceDescriptor.Scoped<ISqlOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository<Guid>>(svp => svp.GetRequiredService<TOutboxRepository>()));

services.TryAddSingleton<SqlOutboxTemplate>();
services.TryAddTransient<IOutboxMigrationService, SqlOutboxMigrationService>();
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public interface ISqlOutboxRepository : IOutboxRepository
public interface ISqlOutboxRepository : IOutboxRepository<Guid>
{
}
17 changes: 11 additions & 6 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSetting
Settings = settings;
}

public async virtual Task Save(OutboxMessage message, CancellationToken token)
public Task<Guid> GenerateId(CancellationToken cancellationToken)
{
return Task.FromResult(Guid.NewGuid());
}

public async virtual Task Save(OutboxMessage<Guid> message, CancellationToken token)
{
await EnsureConnection();

Expand All @@ -39,7 +44,7 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo
}, token);
}

public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
public async Task<IReadOnlyCollection<OutboxMessage<Guid>>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
{
await EnsureConnection();

Expand Down Expand Up @@ -154,7 +159,7 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc
return await cmd.ExecuteNonQueryAsync(token) > 0;
}

internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
internal async Task<IReadOnlyCollection<OutboxMessage<Guid>>> GetAllMessages(CancellationToken cancellationToken)
{
await EnsureConnection();

Expand All @@ -164,7 +169,7 @@ internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(Cancellat
return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
private async Task<IReadOnlyCollection<OutboxMessage<Guid>>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
{
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

Expand All @@ -182,12 +187,12 @@ private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand c
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");

var items = new List<OutboxMessage>();
var items = new List<OutboxMessage<Guid>>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
var message = new OutboxMessage<Guid>
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<OutboxSettings> configure = null)
public static MessageBusBuilder AddOutbox<TOutboxKey>(this MessageBusBuilder mbb, Action<OutboxSettings> configure = null)
{
mbb.PostConfigurationActions.Add(services =>
{
Expand All @@ -17,7 +17,7 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
.Select(x => x.MessageType))
{
var serviceType = typeof(IPublishInterceptor<>).MakeGenericType(producerMessageType);
var implementationType = typeof(OutboxForwardingPublishInterceptor<>).MakeGenericType(producerMessageType);
var implementationType = typeof(OutboxForwardingPublishInterceptor<,>).MakeGenericType(producerMessageType);
services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType));
}

Expand All @@ -33,12 +33,12 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType));
}

services.AddSingleton<OutboxSendingTask>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAdd(ServiceDescriptor.Singleton<IOutboxNotificationService, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.AddSingleton<OutboxSendingTask<TOutboxKey>>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask<TOutboxKey>>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxKey>>()));
services.TryAdd(ServiceDescriptor.Singleton<IOutboxNotificationService, OutboxSendingTask<TOutboxKey>>(sp => sp.GetRequiredService<OutboxSendingTask<TOutboxKey>>()));

services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory<TOutboxKey>>();

services.TryAddSingleton(svp =>
{
Expand All @@ -47,6 +47,7 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
return settings;
});
});

return mbb;
}
}
8 changes: 8 additions & 0 deletions src/SlimMessageBus.Host.Outbox/GlobalSuppressions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

using System.Diagnostics.CodeAnalysis;

[assembly: SuppressMessage("Minor Code Smell", "S2094:Classes should not be empty", Justification = "<Pending>")]
26 changes: 16 additions & 10 deletions src/SlimMessageBus.Host.Outbox/IEnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
namespace SlimMessageBus.Host.Outbox;

internal static class IEnumerableExtensions
static internal class IEnumerableExtensions
{
public static IEnumerable<IReadOnlyCollection<T>> Batch<T>(this IEnumerable<T> items, int batchSize)
{
if (items == null)
{
throw new ArgumentNullException(nameof(items));
}

if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be greater than zero.");
}
// Parameter validation in yielding methods should be wrapped
Check(items, batchSize);

using var enumerator = items.GetEnumerator();
while (enumerator.MoveNext())
Expand All @@ -26,4 +19,17 @@ public static IEnumerable<IReadOnlyCollection<T>> Batch<T>(this IEnumerable<T> i
yield return batch.AsReadOnly();
}
}

private static void Check<T>(IEnumerable<T> items, int batchSize)
{
if (items == null)
{
throw new ArgumentNullException(nameof(items));
}

if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be greater than zero.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public abstract class OutboxForwardingPublishInterceptor
/// Interceptor must be registered as a transient in order for outbox notifications to be raised.
/// Notifications are raised on disposal (if required), to ensure they occur outside of a transaction scope.
/// </remarks>
public sealed class OutboxForwardingPublishInterceptor<T>(
public sealed class OutboxForwardingPublishInterceptor<T, TOutboxKey>(
ILogger<OutboxForwardingPublishInterceptor> logger,
IOutboxRepository outboxRepository,
IOutboxRepository<TOutboxKey> outboxRepository,
IInstanceIdProvider instanceIdProvider,
IOutboxNotificationService outboxNotificationService,
OutboxSettings outboxSettings)
Expand All @@ -23,7 +23,7 @@ public sealed class OutboxForwardingPublishInterceptor<T>(
internal const string SkipOutboxHeader = "__SkipOutbox";

private readonly ILogger _logger = logger;
private readonly IOutboxRepository _outboxRepository = outboxRepository;
private readonly IOutboxRepository<TOutboxKey> _outboxRepository = outboxRepository;
private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider;
private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService;
private readonly OutboxSettings _outboxSettings = outboxSettings;
Expand Down Expand Up @@ -67,16 +67,21 @@ public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
var messagePayload = busMaster.Serializer?.Serialize(messageType, message)
?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin");

// Generates a unique identifier in the current database
var outboxId = await _outboxRepository.GenerateId(context.CancellationToken);

// Add message to the database, do not call next()
var outboxMessage = new OutboxMessage
var outboxMessage = new OutboxMessage<TOutboxKey>
{
Id = outboxId,
BusName = busMaster.Name,
Headers = context.Headers,
Path = context.Path,
MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType),
MessagePayload = messagePayload,
InstanceId = _instanceIdProvider.GetInstanceId()
};

await _outboxRepository.Save(outboxMessage, context.CancellationToken);

// a message was sent, notify outbox service to poll on dispose (post transaction)
Expand Down
13 changes: 7 additions & 6 deletions src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace SlimMessageBus.Host.Outbox;

public interface IOutboxRepository
public interface IOutboxRepository<TOutboxKey>
{
Task Save(OutboxMessage message, CancellationToken token);
Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token);
Task AbortDelivery (IReadOnlyCollection<Guid> ids, CancellationToken token);
Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token);
Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token);
Task<TOutboxKey> GenerateId(CancellationToken cancellationToken);
Task Save(OutboxMessage<TOutboxKey> message, CancellationToken token);
Task<IReadOnlyCollection<OutboxMessage<TOutboxKey>>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token);
Task AbortDelivery(IReadOnlyCollection<TOutboxKey> ids, CancellationToken token);
Task UpdateToSent(IReadOnlyCollection<TOutboxKey> ids, CancellationToken token);
Task IncrementDeliveryAttempt(IReadOnlyCollection<TOutboxKey> ids, int maxDeliveryAttempts, CancellationToken token);
Task DeleteSent(DateTime olderThan, CancellationToken token);
Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token);
}
5 changes: 3 additions & 2 deletions src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
namespace SlimMessageBus.Host.Outbox;

public class OutboxMessage

public class OutboxMessage<TOutboxKey>
{
public Guid Id { get; set; } = Guid.NewGuid();
public TOutboxKey Id { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public string BusName { get; set; }
public string MessageType { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
namespace SlimMessageBus.Host.Outbox.Services;
using SlimMessageBus.Host.Outbox;

public sealed class OutboxLockRenewalTimer : IOutboxLockRenewalTimer
public sealed class OutboxLockRenewalTimer<TOutboxKey> : IOutboxLockRenewalTimer
{
private readonly object _lock;
private readonly Timer _timer;
private readonly ILogger<OutboxLockRenewalTimer> _logger;
private readonly IOutboxRepository _outboxRepository;
private readonly ILogger<OutboxLockRenewalTimer<TOutboxKey>> _logger;
private readonly IOutboxRepository<TOutboxKey> _outboxRepository;
private readonly CancellationToken _cancellationToken;
private readonly Action<Exception> _lockLost;
private bool _active;
private bool _renewingLock;

public OutboxLockRenewalTimer(ILogger<OutboxLockRenewalTimer> logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action<Exception> lockLost, CancellationToken cancellationToken)
public OutboxLockRenewalTimer(ILogger<OutboxLockRenewalTimer<TOutboxKey>> logger, IOutboxRepository<TOutboxKey> outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action<Exception> lockLost, CancellationToken cancellationToken)
{

Debug.Assert(lockRenewalInterval < lockDuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
namespace SlimMessageBus.Host.Outbox.Services;
public class OutboxLockRenewalTimerFactory : IOutboxLockRenewalTimerFactory, IAsyncDisposable

public class OutboxLockRenewalTimerFactory<TOutboxKey>(IServiceProvider serviceProvider) : IOutboxLockRenewalTimerFactory, IAsyncDisposable
{
private readonly IServiceScope _scope;
private readonly IServiceScope _scope = serviceProvider.CreateScope();

private bool _isDisposed = false;

public OutboxLockRenewalTimerFactory(IServiceProvider serviceProvider)
{
_scope = serviceProvider.CreateScope();
}

public IOutboxLockRenewalTimer CreateRenewalTimer(TimeSpan lockDuration, TimeSpan interval, Action<Exception> lockLost, CancellationToken cancellationToken)
{
return (OutboxLockRenewalTimer)ActivatorUtilities.CreateInstance(_scope.ServiceProvider, typeof(OutboxLockRenewalTimer), lockDuration, interval, lockLost, cancellationToken);
return (OutboxLockRenewalTimer<TOutboxKey>)ActivatorUtilities.CreateInstance(_scope.ServiceProvider, typeof(OutboxLockRenewalTimer<TOutboxKey>), lockDuration, interval, lockLost, cancellationToken);
}

public async ValueTask DisposeAsync()
Expand Down
Loading

0 comments on commit 6f2c4f8

Please sign in to comment.