From d1865a59d26dcfe75fc3452df1d222949a9a08f9 Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Wed, 17 Jul 2024 09:46:44 +0800 Subject: [PATCH] #251 Health check circuit breaker Signed-off-by: Richard Pringle --- build/tasks.ps1 | 2 + docs/intro.md | 27 ++ docs/intro.t.md | 27 ++ src/.editorconfig | 2 + src/Infrastructure/docker-compose.yml | 2 +- .../Consumers/AddConsumer.cs | 17 + .../Consumers/SubtractConsumer.cs | 17 + .../GlobalUsings.cs | 17 + .../HealthChecks/AddRandomHealthCheck.cs | 11 + .../HealthChecks/RandomHealthCheck.cs | 20 ++ .../HealthChecks/SubtractRandomHealthCheck.cs | 11 + .../IntermittentMessagePublisher.cs | 28 ++ .../Models/Add.cs | 3 + .../Models/Subtract.cs | 3 + .../Program.cs | 91 +++++ .../Sample.CircuitBreaker.HealthCheck.csproj | 33 ++ .../appsettings.json | 19 ++ .../Consumer/EhGroupConsumer.cs | 4 +- .../EventHubMessageBus.cs | 4 +- .../Consumer/AsbBaseConsumer.cs | 2 +- .../Config/ConsumerBuilderExtensions.cs | 47 +++ .../Config/SettingsExtensions.cs | 47 +++ .../GlobalUsings.cs | 7 + .../HealthCheckBackgroundService.cs | 128 +++++++ .../HealthCheckCircuitBreaker.cs | 92 +++++ .../IHealthCheckHostBreaker.cs | 9 + ...Bus.Host.CircuitBreaker.HealthCheck.csproj | 30 ++ .../Builders/AbstractConsumerBuilder.cs | 5 +- .../Builders/MessageBusBuilder.cs | 35 +- .../GlobalUsings.cs | 3 +- .../Settings/AbstractConsumerSettings.cs | 10 + .../Settings/ConsumerSettings.cs | 9 +- .../TypeCollection.cs | 78 +++++ .../Consumer/KafkaGroupConsumer.cs | 4 +- .../KafkaMessageBus.cs | 8 +- .../MqttMessageBus.cs | 8 +- .../MqttTopicConsumer.cs | 3 +- .../Consumers/AbstractRabbitMqConsumer.cs | 4 +- .../Consumers/RabbitMqConsumer.cs | 2 +- .../Consumers/RabbitMqResponseConsumer.cs | 2 +- .../Consumers/RedisListCheckerConsumer.cs | 2 +- .../Consumers/RedisTopicConsumer.cs | 4 +- .../RedisMessageBus.cs | 8 +- .../Consumer/AbstractConsumer.cs | 106 +++++- .../ServiceCollectionExtensions.cs | 2 +- src/SlimMessageBus.Host/IConsumerControl.cs | 2 +- src/SlimMessageBus.sln | 38 ++- src/SlimMessageBus/IConsumerCircuitBreaker.cs | 17 + .../GlobalUsings.cs | 12 + .../HealthCheckBackgroundServiceTests.cs | 323 ++++++++++++++++++ .../HealthCheckCircuitBreakerTests.cs | 184 ++++++++++ ...ost.CircuitBreaker.HealthCheck.Test.csproj | 23 ++ .../xunit.runner.json | 4 + .../TypeCollectionTests.cs | 217 ++++++++++++ .../ConcurrentMessageProcessorQueueTests.cs | 2 +- .../Consumers/MessageProcessorQueueTests.cs | 2 +- .../OutboxTests.cs | 4 +- .../IntegrationTests/RabbitMqMessageBusIt.cs | 2 +- .../Consumer/AbstractConsumerTests.cs | 139 ++++++++ .../SlimMessageBus.Host.Test/GlobalUsings.cs | 6 +- .../Helpers/ReflectionUtilsTests.cs | 10 +- 61 files changed, 1920 insertions(+), 58 deletions(-) create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/Sample.CircuitBreaker.HealthCheck.csproj create mode 100644 src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/IHealthCheckHostBreaker.cs create mode 100644 src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj create mode 100644 src/SlimMessageBus.Host.Configuration/TypeCollection.cs create mode 100644 src/SlimMessageBus/IConsumerCircuitBreaker.cs create mode 100644 src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs create mode 100644 src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs create mode 100644 src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs create mode 100644 src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj create mode 100644 src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/xunit.runner.json create mode 100644 src/Tests/SlimMessageBus.Host.Configuration.Test/TypeCollectionTests.cs create mode 100644 src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs diff --git a/build/tasks.ps1 b/build/tasks.ps1 index a4043738..60050da7 100644 --- a/build/tasks.ps1 +++ b/build/tasks.ps1 @@ -41,6 +41,8 @@ $projects = @( "SlimMessageBus.Host.Outbox.Sql", "SlimMessageBus.Host.Outbox.DbContext", + "SlimMessageBus.Host.CircuitBreaker.HealthCheck", + "SlimMessageBus.Host.AsyncApi" ) diff --git a/docs/intro.md b/docs/intro.md index 8a9f0d01..167722c4 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -6,6 +6,7 @@ - [Set message headers](#set-message-headers) - [Consumer](#consumer) - [Start or Stop message consumption](#start-or-stop-message-consumption) + - [Health check circuit breaker](#health-check-circuit-breaker) - [Consumer context (additional message information)](#consumer-context-additional-message-information) - [Per-message DI container scope](#per-message-di-container-scope) - [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse) @@ -258,6 +259,32 @@ await consumerControl.Stop(); > Since version 1.15.5 +#### Health check circuit breaker + +Consumers can be associated with health check tags to enable or disable the consumer as the check's status changes. + +```cs + // add health checks with tags + builder.Services + .AddHealthChecks() + .AddCheck("Storage", tags: ["Storage"]); + .AddCheck("SqlServer", tags: ["Sql"]); + + builder.Services + .AddSlimMessageBus(mbb => { + ... + + mbb.Consume(cfg => { + ... + + // configure consumer to monitor tag/state + cfg.PauseOnUnhealthyCheck("Storage"); + cfg.PauseOnDegradedHealthCheck("Sql"); + }) + }) +``` +*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck* + #### Consumer context (additional message information) > Changed in version 1.15.0 diff --git a/docs/intro.t.md b/docs/intro.t.md index 4c718cb0..d71c8167 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -6,6 +6,7 @@ - [Set message headers](#set-message-headers) - [Consumer](#consumer) - [Start or Stop message consumption](#start-or-stop-message-consumption) + - [Health check circuit breaker](#health-check-circuit-breaker) - [Consumer context (additional message information)](#consumer-context-additional-message-information) - [Per-message DI container scope](#per-message-di-container-scope) - [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse) @@ -258,6 +259,32 @@ await consumerControl.Stop(); > Since version 1.15.5 +#### Health check circuit breaker + +Consumers can be associated with health check tags to enable or disable the consumer as the check's status changes. + +```cs + // add health checks with tags + builder.Services + .AddHealthChecks() + .AddCheck("Storage", tags: ["Storage"]); + .AddCheck("SqlServer", tags: ["Sql"]); + + builder.Services + .AddSlimMessageBus(mbb => { + ... + + mbb.Consume(cfg => { + ... + + // configure consumer to monitor tag/state + cfg.PauseOnUnhealthyCheck("Storage"); + cfg.PauseOnDegradedHealthCheck("Sql"); + }) + }) +``` +*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck* + #### Consumer context (additional message information) > Changed in version 1.15.0 diff --git a/src/.editorconfig b/src/.editorconfig index fa0c35ca..cb339f90 100644 --- a/src/.editorconfig +++ b/src/.editorconfig @@ -178,6 +178,8 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent dotnet_style_allow_statement_immediately_after_block_experimental = true:silent dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion dotnet_diagnostic.CA1859.severity = silent +# not supported by .netstandard2.0 +dotnet_diagnostic.CA1510.severity = none [*.{csproj,xml}] indent_style = space diff --git a/src/Infrastructure/docker-compose.yml b/src/Infrastructure/docker-compose.yml index 2da9cc4e..5c7dace4 100644 --- a/src/Infrastructure/docker-compose.yml +++ b/src/Infrastructure/docker-compose.yml @@ -16,7 +16,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1" + KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs new file mode 100644 index 00000000..dfdb3d98 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs @@ -0,0 +1,17 @@ +namespace Sample.CircuitBreaker.HealthCheck.Consumers; + +public class AddConsumer : IConsumer +{ + private readonly ILogger _logger; + + public AddConsumer(ILogger logger) + { + _logger = logger; + } + + public Task OnHandle(Add message) + { + _logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b); + return Task.CompletedTask; + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs new file mode 100644 index 00000000..e561ddd7 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs @@ -0,0 +1,17 @@ +namespace Sample.CircuitBreaker.HealthCheck.Consumers; + +public class SubtractConsumer : IConsumer +{ + private readonly ILogger _logger; + + public SubtractConsumer(ILogger logger) + { + _logger = logger; + } + + public Task OnHandle(Subtract message) + { + _logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b); + return Task.CompletedTask; + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs new file mode 100644 index 00000000..61fd0fa5 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs @@ -0,0 +1,17 @@ +global using System.Net.Mime; +global using System.Reflection; + +global using Microsoft.Extensions.Configuration; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.Logging; + +global using Sample.CircuitBreaker.HealthCheck.Consumers; +global using Sample.CircuitBreaker.HealthCheck.Models; + +global using SecretStore; + +global using SlimMessageBus; +global using SlimMessageBus.Host; +global using SlimMessageBus.Host.RabbitMQ; +global using SlimMessageBus.Host.Serialization.SystemTextJson; \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs new file mode 100644 index 00000000..b74784dd --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs @@ -0,0 +1,11 @@ +namespace Sample.CircuitBreaker.HealthCheck.HealthChecks; + +using Microsoft.Extensions.Logging; + +public class AddRandomHealthCheck : RandomHealthCheck +{ + public AddRandomHealthCheck(ILogger logger) + : base(logger) + { + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs new file mode 100644 index 00000000..cf9ccf88 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs @@ -0,0 +1,20 @@ +namespace Sample.CircuitBreaker.HealthCheck.HealthChecks; + +using Microsoft.Extensions.Diagnostics.HealthChecks; + +public abstract class RandomHealthCheck : IHealthCheck +{ + private readonly ILogger _logger; + + protected RandomHealthCheck(ILogger logger) + { + _logger = logger; + } + + public Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) + { + var value = (HealthStatus)Random.Shared.Next(3); + _logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value); + return Task.FromResult(new HealthCheckResult(value, value.ToString())); + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs new file mode 100644 index 00000000..8a68b0b1 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs @@ -0,0 +1,11 @@ +namespace Sample.CircuitBreaker.HealthCheck.HealthChecks; + +using Microsoft.Extensions.Logging; + +public class SubtractRandomHealthCheck : RandomHealthCheck +{ + public SubtractRandomHealthCheck(ILogger logger) + : base(logger) + { + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs new file mode 100644 index 00000000..73110c15 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs @@ -0,0 +1,28 @@ +namespace Sample.CircuitBreaker.HealthCheck; +public class IntermittentMessagePublisher : BackgroundService +{ + private readonly ILogger _logger; + private readonly IMessageBus _messageBus; + + public IntermittentMessagePublisher(ILogger logger, IMessageBus messageBus) + { + _logger = logger; + _messageBus = messageBus; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + var a = Random.Shared.Next(10); + var b = Random.Shared.Next(10); + + //_logger.LogInformation("Emitting {A} +- {B} = ?", a, b); + + await Task.WhenAll( + _messageBus.Publish(new Add(a, b)), + _messageBus.Publish(new Subtract(a, b)), + Task.Delay(1000, stoppingToken)); + } + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs new file mode 100644 index 00000000..97c5e418 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs @@ -0,0 +1,3 @@ +namespace Sample.CircuitBreaker.HealthCheck.Models; + +public record Add(int a, int b); \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs new file mode 100644 index 00000000..51d2efc4 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs @@ -0,0 +1,3 @@ +namespace Sample.CircuitBreaker.HealthCheck.Models; + +public record Subtract(int a, int b); \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs new file mode 100644 index 00000000..716253f5 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs @@ -0,0 +1,91 @@ +namespace Sample.CircuitBreaker.HealthCheck; +using Microsoft.Extensions.Diagnostics.HealthChecks; + +using Sample.CircuitBreaker.HealthCheck.HealthChecks; + +using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config; + +public static class Program +{ + private static async Task Main(string[] args) + { + // Local file with secrets + Secrets.Load(@"..\..\..\..\..\secrets.txt"); + + await Host.CreateDefaultBuilder(args) + .ConfigureServices((builder, services) => + { + const string AddTag = "add"; + const string SubtractTag = "subtract"; + + services.AddSlimMessageBus(mbb => + { + var ticks = DateTimeOffset.UtcNow.Ticks; + var addTopic = $"Sample-CircuitBreaker-HealthCheck-add-{ticks}"; + var subtractTopic = $"Sample-CircuitBreaker-HealthCheck-subtract-{ticks}"; + + mbb + .WithProviderRabbitMQ( + cfg => + { + cfg.ConnectionString = Secrets.Service.PopulateSecrets(builder.Configuration.GetValue("RabbitMQ:ConnectionString")); + cfg.ConnectionFactory.ClientProvidedName = $"Sample_CircuitBreaker_HealthCheck_{Environment.MachineName}"; + + cfg.UseMessagePropertiesModifier((m, p) => p.ContentType = MediaTypeNames.Application.Json); + cfg.UseExchangeDefaults(durable: false); + cfg.UseQueueDefaults(durable: false); + }); + mbb + .Produce(x => x + .Exchange(addTopic, exchangeType: ExchangeType.Fanout, autoDelete: false) + .RoutingKeyProvider((m, p) => Guid.NewGuid().ToString())) + .Consume( + cfg => + { + cfg + .Queue(nameof(Add), autoDelete: false) + .Path(nameof(Add)) + .ExchangeBinding(addTopic) + .WithConsumer() + .PauseOnDegradedHealthCheck(AddTag); + }); + + mbb + .Produce(x => x + .Exchange(subtractTopic, exchangeType: ExchangeType.Fanout, autoDelete: false) + .RoutingKeyProvider((m, p) => Guid.NewGuid().ToString())) + .Consume( + cfg => + { + cfg + .Queue(nameof(Subtract), autoDelete: false) + .Path(nameof(Subtract)) + .ExchangeBinding(subtractTopic) + .WithConsumer() + .PauseOnUnhealthyCheck(SubtractTag); + }); + + mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly()); + mbb.AddJsonSerializer(); + }); + + services.AddHostedService(); + services.AddSingleton(); + services.AddSingleton(); + + services.Configure(cfg => + { + // aggressive to toggle health status often (sample only) + cfg.Delay = TimeSpan.FromSeconds(3); + cfg.Period = TimeSpan.FromSeconds(5); + }); + + services + .AddHealthChecks() + .AddCheck("Add", tags: [AddTag]) + .AddCheck("Subtract", tags: [SubtractTag]); + }) + .Build() + .RunAsync(); + } +} diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Sample.CircuitBreaker.HealthCheck.csproj b/src/Samples/Sample.CircuitBreaker.HealthCheck/Sample.CircuitBreaker.HealthCheck.csproj new file mode 100644 index 00000000..8e4ac1af --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Sample.CircuitBreaker.HealthCheck.csproj @@ -0,0 +1,33 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + + + + + + + + Always + true + PreserveNewest + + + + diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json b/src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json new file mode 100644 index 00000000..fd742024 --- /dev/null +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json @@ -0,0 +1,19 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + }, + "Console": { + "FormatterName": "simple", + "FormatterOptions": { + "SingleLine": true, + "TimestampFormat": "HH:mm:ss.fff " + } + } + }, + "RabbitMQ": { + "ConnectionString": "{{rabbitmq_connectionstring}}" + } +} diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs index ebf82dcd..a1bd113c 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs @@ -11,8 +11,8 @@ public class EhGroupConsumer : AbstractConsumer public EventHubMessageBus MessageBus { get; } - public EhGroupConsumer(EventHubMessageBus messageBus, GroupPath groupPath, Func partitionConsumerFactory) - : base(messageBus.LoggerFactory.CreateLogger()) + public EhGroupConsumer(IEnumerable consumerSettings, EventHubMessageBus messageBus, GroupPath groupPath, Func partitionConsumerFactory) + : base(messageBus.LoggerFactory.CreateLogger(), consumerSettings) { _groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath)); if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory)); diff --git a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs index 4d2eb13c..432df738 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs @@ -58,14 +58,14 @@ protected override async Task CreateConsumers() foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList())) { _logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group); - AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition))); + AddConsumer(new EhGroupConsumer(consumerSettings, this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition))); } if (Settings.RequestResponse != null) { var pathGroup = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup()); _logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", pathGroup.Path, pathGroup.Group); - AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition))); + AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition))); } } diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs index 9154d6d8..6dd2a059 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs @@ -10,7 +10,7 @@ public abstract class AsbBaseConsumer : AbstractConsumer protected TopicSubscriptionParams TopicSubscription { get; } protected AsbBaseConsumer(ServiceBusMessageBus messageBus, ServiceBusClient serviceBusClient, TopicSubscriptionParams subscriptionFactoryParams, IMessageProcessor messageProcessor, IEnumerable consumerSettings, ILogger logger) - : base(logger ?? throw new ArgumentNullException(nameof(logger))) + : base(logger ?? throw new ArgumentNullException(nameof(logger)), consumerSettings) { MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); TopicSubscription = subscriptionFactoryParams ?? throw new ArgumentNullException(nameof(subscriptionFactoryParams)); diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs new file mode 100644 index 00000000..c721f5dc --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs @@ -0,0 +1,47 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config; + +using Microsoft.Extensions.DependencyInjection.Extensions; + +public static class ConsumerBuilderExtensions +{ + public static T PauseOnUnhealthyCheck(this T builder, params string[] tags) + where T : AbstractConsumerBuilder + { + if (builder is null) + { + throw new ArgumentNullException(nameof(builder)); + } + + builder.ConsumerSettings.PauseOnUnhealthy(tags); + RegisterHealthServices(builder); + return builder; + } + + public static T PauseOnDegradedHealthCheck(this T builder, params string[] tags) + where T : AbstractConsumerBuilder + { + if (builder is null) + { + throw new ArgumentNullException(nameof(builder)); + } + + builder.ConsumerSettings.PauseOnDegraded(tags); + RegisterHealthServices(builder); + return builder; + } + + private static void RegisterHealthServices(AbstractConsumerBuilder builder) + { + builder.ConsumerSettings.CircuitBreakers.TryAdd(); + builder.PostConfigurationActions.Add( + services => + { + services.TryAddSingleton(); + services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + services.TryAdd(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + services.AddHostedService(sp => sp.GetRequiredService()); + + services.TryAddSingleton(); + }); + } +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs new file mode 100644 index 00000000..a2775f10 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs @@ -0,0 +1,47 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +static internal class SettingsExtensions +{ + private const string _key = nameof(HealthCheckCircuitBreaker); + + public static T PauseOnDegraded(this T consumerSettings, params string[] tags) + where T : AbstractConsumerSettings + { + if (tags.Length > 0) + { + var dict = consumerSettings.HealthBreakerTags(); + foreach (var tag in tags) + { + dict[tag] = HealthStatus.Degraded; + } + } + + return consumerSettings; + } + + public static T PauseOnUnhealthy(this T consumerSettings, params string[] tags) + where T : AbstractConsumerSettings + { + if (tags.Length > 0) + { + var dict = consumerSettings.HealthBreakerTags(); + foreach (var tag in tags) + { + dict[tag] = HealthStatus.Unhealthy; + } + } + + return consumerSettings; + } + + static internal IDictionary HealthBreakerTags(this AbstractConsumerSettings consumerSettings) + { + if (!consumerSettings.Properties.TryGetValue(_key, out var rawValue) || rawValue is not IDictionary value) + { + value = new Dictionary(); + consumerSettings.Properties[_key] = value; + } + + return value; + } +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs new file mode 100644 index 00000000..6d9be2c3 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs @@ -0,0 +1,7 @@ +global using System; +global using System.Diagnostics; + +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Diagnostics.HealthChecks; +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.Logging; diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs new file mode 100644 index 00000000..53751b6f --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs @@ -0,0 +1,128 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +internal sealed class HealthCheckBackgroundService : IHealthCheckPublisher, IHostedService, IHealthCheckHostBreaker, IDisposable +{ + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly Dictionary _healthReportEntries; + private readonly List _onChangeDelegates; + private readonly SemaphoreSlim _semaphore; + private IReadOnlyDictionary _tagStatus; + + public HealthCheckBackgroundService() + { + _cancellationTokenSource = new CancellationTokenSource(); + _healthReportEntries = []; + _onChangeDelegates = []; + _semaphore = new SemaphoreSlim(1, 1); + _tagStatus = new Dictionary(); + } + + public IReadOnlyDictionary TagStatus => _tagStatus; + + public async Task PublishAsync(HealthReport report, CancellationToken cancellationToken) + { + var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken).Token; + + await _semaphore.WaitAsync(linkedToken); + try + { + UpdateHealthReportEntries(report); + if (UpdateTagStatus() && !linkedToken.IsCancellationRequested) + { + await Task.WhenAll(_onChangeDelegates.Select(x => x(_tagStatus))); + } + } + finally + { + _semaphore.Release(); + } + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _cancellationTokenSource.Cancel(); + return Task.CompletedTask; + } + + public async Task Subscribe(OnChangeDelegate onChange) + { + _onChangeDelegates.Add(onChange); + await onChange(_tagStatus); + } + + public void Unsubscribe(OnChangeDelegate onChange) + { + _onChangeDelegates.Remove(onChange); + } + + private void UpdateHealthReportEntries(HealthReport report) + { + foreach (var entry in report.Entries) + { + if (entry.Value.Tags.Any()) + { + _healthReportEntries[entry.Key] = entry.Value; + } + } + } + + private bool UpdateTagStatus() + { + var tagStatus = new Dictionary(); + foreach (var entry in _healthReportEntries.Values) + { + foreach (var tag in entry.Tags) + { + if (tagStatus.TryGetValue(tag, out var currentStatus)) + { + if ((entry.Status == HealthStatus.Degraded && currentStatus == HealthStatus.Healthy) + || (entry.Status == HealthStatus.Unhealthy && currentStatus != HealthStatus.Unhealthy)) + { + tagStatus[tag] = entry.Status; + } + + continue; + } + + tagStatus[tag] = entry.Status; + } + } + + if (!AreEqual(_tagStatus, tagStatus)) + { + _tagStatus = tagStatus; + return true; + } + + return false; + } + + internal static bool AreEqual(IReadOnlyDictionary dict1, IReadOnlyDictionary dict2) + { + if (dict1.Count != dict2.Count) + { + return false; + } + + foreach (var kvp in dict1) + { + if (!dict2.TryGetValue(kvp.Key, out var value) || !EqualityComparer.Default.Equals(kvp.Value, value)) + { + return false; + } + } + + return true; + } + + public void Dispose() + { + _cancellationTokenSource.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs new file mode 100644 index 00000000..8797fc9e --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs @@ -0,0 +1,92 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +internal sealed class HealthCheckCircuitBreaker : IConsumerCircuitBreaker +{ + private readonly IEnumerable _settings; + private readonly IHealthCheckHostBreaker _host; + private Func? _onChange; + private IDictionary? _monitoredTags; + + public HealthCheckCircuitBreaker(IEnumerable settings, IHealthCheckHostBreaker host) + { + _settings = settings; + _host = host; + State = Circuit.Open; + } + + public Circuit State { get; private set; } + + public async Task Subscribe(Func onChange) + { + Debug.Assert(_onChange == null); + _onChange = onChange; + + _monitoredTags = _settings + .Select(x => x.HealthBreakerTags()) + .Aggregate( + (a, b) => + { + var c = new Dictionary(a.Count + b.Count); + foreach (var kvp in a) + { + var status = kvp.Value; + if (b.TryGetValue(kvp.Key, out var altStatus)) + { + b.Remove(kvp.Key); + if (status != altStatus && altStatus == HealthStatus.Degraded) + { + status = HealthStatus.Degraded; + } + } + + c[kvp.Key] = status; + } + + foreach (var kvp in b) + { + c.Add(kvp.Key, kvp.Value); + } + + return c; + }); + + await _host.Subscribe(TagStatusChanged); + } + + public void Unsubscribe() + { + _host.Unsubscribe(TagStatusChanged); + _onChange = null; + _monitoredTags = null; + } + + internal async Task TagStatusChanged(IReadOnlyDictionary tags) + { + var newState = _monitoredTags! + .All( + monitoredTag => + { + if (!tags.TryGetValue(monitoredTag.Key, out var currentStatus)) + { + // unknown tag, assume healthy + return true; + } + + return currentStatus switch + { + HealthStatus.Healthy => true, + HealthStatus.Degraded => monitoredTag.Value != HealthStatus.Degraded, + HealthStatus.Unhealthy => false, + _ => throw new InvalidOperationException($"Unknown health status '{currentStatus}'") + }; + }) + ? Circuit.Open + : Circuit.Closed; + + if (State != newState) + { + State = newState; + await _onChange!(newState); + } + } +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/IHealthCheckHostBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/IHealthCheckHostBreaker.cs new file mode 100644 index 00000000..c87c7fcf --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/IHealthCheckHostBreaker.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +internal interface IHealthCheckHostBreaker +{ + Task Subscribe(OnChangeDelegate onChange); + void Unsubscribe(OnChangeDelegate onChange); +} + +public delegate Task OnChangeDelegate(IReadOnlyDictionary tags); \ No newline at end of file diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj new file mode 100644 index 00000000..611abab7 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj @@ -0,0 +1,30 @@ + + + + + + Health check circuit breaker for SlimMessageBus + Toggle consumer on health check status changes + icon.png + + enable + + + + + + + + + + + + + <_Parameter1>SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test + + + <_Parameter1>DynamicProxyGenAssembly2 + + + + diff --git a/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs index f573805b..51772e56 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs @@ -1,11 +1,13 @@ namespace SlimMessageBus.Host; -public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder +public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder, IHasPostConfigurationActions { public MessageBusSettings Settings { get; } public ConsumerSettings ConsumerSettings { get; } + public IList> PostConfigurationActions { get; } = []; + AbstractConsumerSettings IAbstractConsumerBuilder.ConsumerSettings => ConsumerSettings; protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, string path = null) @@ -14,6 +16,7 @@ protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, ConsumerSettings = new ConsumerSettings { + MessageBusSettings = settings, MessageType = messageType, Path = path, }; diff --git a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs index 04ea49a8..6bfb68cb 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs @@ -26,6 +26,8 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui public IList> PostConfigurationActions { get; } = []; + protected IList ConsumerPostConfigurationActions { get; } = []; + protected MessageBusBuilder() { } @@ -36,6 +38,14 @@ protected MessageBusBuilder(MessageBusBuilder other) Children = other.Children; BusFactory = other.BusFactory; PostConfigurationActions = other.PostConfigurationActions; + ConsumerPostConfigurationActions = other.ConsumerPostConfigurationActions; + } + + public IEnumerable> GetPostConfigurationActions() + { + return PostConfigurationActions + .Concat(ConsumerPostConfigurationActions.SelectMany(x => x.PostConfigurationActions)) + .Concat(Children.Values.SelectMany(x => x.PostConfigurationActions.Concat(x.ConsumerPostConfigurationActions.SelectMany(z => z.PostConfigurationActions)))); } public static MessageBusBuilder Create() => new(); @@ -98,6 +108,9 @@ public MessageBusBuilder Consume(Action> bui // Apply default consumer type of not set b.WithConsumer>(); } + + ConsumerPostConfigurationActions.Add(b); + return this; } @@ -111,7 +124,11 @@ public MessageBusBuilder Consume(Type messageType, Action(Settings, messageType)); + var b = new ConsumerBuilder(Settings, messageType); + builder(b); + + ConsumerPostConfigurationActions.Add(b); + return this; } @@ -135,6 +152,8 @@ public MessageBusBuilder Handle(Action>(); } + ConsumerPostConfigurationActions.Add(b); + return this; } @@ -157,6 +176,8 @@ public MessageBusBuilder Handle(Action> build b.WithHandler>(); } + ConsumerPostConfigurationActions.Add(b); + return this; } @@ -173,7 +194,11 @@ public MessageBusBuilder Handle(Type requestType, Type responseType, Action(Settings, requestType, responseType)); + var b = new HandlerBuilder(Settings, requestType, responseType); + builder(b); + + ConsumerPostConfigurationActions.Add(b); + return this; } @@ -189,7 +214,11 @@ public MessageBusBuilder Handle(Type requestType, Action> if (requestType == null) throw new ArgumentNullException(nameof(requestType)); if (builder == null) throw new ArgumentNullException(nameof(builder)); - builder(new HandlerBuilder(Settings, requestType)); + var b = new HandlerBuilder(Settings, requestType); + builder(b); + + ConsumerPostConfigurationActions.Add(b); + return this; } diff --git a/src/SlimMessageBus.Host.Configuration/GlobalUsings.cs b/src/SlimMessageBus.Host.Configuration/GlobalUsings.cs index e3b57e70..4612d87f 100644 --- a/src/SlimMessageBus.Host.Configuration/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Configuration/GlobalUsings.cs @@ -1,4 +1,5 @@ -global using System.Reflection; +global using System.Collections; +global using System.Reflection; global using Microsoft.Extensions.DependencyInjection; diff --git a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs index d044317e..a04fd491 100644 --- a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs +++ b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs @@ -2,6 +2,11 @@ namespace SlimMessageBus.Host; public abstract class AbstractConsumerSettings : HasProviderExtensions { + /// + /// The settings for the message bus to which the consumer belongs. + /// + public MessageBusSettings MessageBusSettings { get; set; } + /// /// The topic or queue name. /// @@ -18,6 +23,11 @@ public abstract class AbstractConsumerSettings : HasProviderExtensions /// public int Instances { get; set; } + /// + /// to be used with the consumer. + /// + public TypeCollection CircuitBreakers { get; } = []; + protected AbstractConsumerSettings() { Instances = 1; diff --git a/src/SlimMessageBus.Host.Configuration/Settings/ConsumerSettings.cs b/src/SlimMessageBus.Host.Configuration/Settings/ConsumerSettings.cs index efcd7e6b..0bdd3363 100644 --- a/src/SlimMessageBus.Host.Configuration/Settings/ConsumerSettings.cs +++ b/src/SlimMessageBus.Host.Configuration/Settings/ConsumerSettings.cs @@ -2,15 +2,15 @@ namespace SlimMessageBus.Host; public class ConsumerSettings : AbstractConsumerSettings, IMessageTypeConsumerInvokerSettings { - private Type messageType; + private Type _messageType; /// public Type MessageType { - get => messageType; + get => _messageType; set { - messageType = value; + _messageType = value; CalculateResponseType(); } } @@ -18,10 +18,11 @@ public Type MessageType private void CalculateResponseType() { // Try to get T from IRequest - ResponseType = messageType.GetInterfaces() + ResponseType = _messageType.GetInterfaces() .SingleOrDefault(i => i.GetTypeInfo().IsGenericType && i.GetTypeInfo().GetGenericTypeDefinition() == typeof(IRequest<>))?.GetGenericArguments()[0]; } + /// /// Type of consumer that is configured (subscriber or request handler). /// public ConsumerMode ConsumerMode { get; set; } diff --git a/src/SlimMessageBus.Host.Configuration/TypeCollection.cs b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs new file mode 100644 index 00000000..04246947 --- /dev/null +++ b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs @@ -0,0 +1,78 @@ +namespace SlimMessageBus.Host; + +public class TypeCollection : IEnumerable where TInterface : class +{ + private readonly Type _interfaceType = typeof(TInterface); + private readonly List _innerList = []; + + public void Add(Type type) + { + if (!_interfaceType.IsAssignableFrom(type)) + { + throw new ArgumentException($"Type is not assignable to '{_interfaceType}'.", nameof(type)); + } + + if (_innerList.Contains(type)) + { + throw new ArgumentException("Type already exists in the collection.", nameof(type)); + } + + _innerList.Add(type); + } + + public void Add() where T : TInterface + { + var type = typeof(T); + if (_innerList.Contains(type)) + { + throw new ArgumentException("Type already exists in the collection.", nameof(type)); // NOSONAR + } + + _innerList.Add(type); + } + + public bool TryAdd() where T : TInterface + { + var type = typeof(T); + if (_innerList.Contains(type)) + { + return false; + } + + _innerList.Add(type); + return true; + } + + public void Clear() => _innerList.Clear(); + + public bool Contains() where T : TInterface + { + return _innerList.Contains(typeof(T)); + } + + public void CopyTo(Type[] array, int arrayIndex) => _innerList.CopyTo(array, arrayIndex); + + public bool Remove() where T : TInterface + { + return _innerList.Remove(typeof(T)); + } + + public bool Remove(Type type) + { + return _innerList.Remove(type); + } + + public int Count => _innerList.Count; + + public bool IsReadOnly => false; + + public IEnumerator GetEnumerator() + { + return _innerList.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return _innerList.GetEnumerator(); + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index 3fa0e413..23dc1f15 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -15,8 +15,8 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController public string Group { get; } public IReadOnlyCollection Topics { get; } - public KafkaGroupConsumer(KafkaMessageBus messageBus, string group, IReadOnlyCollection topics, Func processorFactory) - : base(messageBus.LoggerFactory.CreateLogger()) + public KafkaGroupConsumer(IEnumerable consumerSettings, KafkaMessageBus messageBus, string group, IReadOnlyCollection topics, Func processorFactory) + : base(messageBus.LoggerFactory.CreateLogger(), consumerSettings) { MessageBus = messageBus; Group = group ?? throw new ArgumentNullException(nameof(group)); diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 39e9f488..3f000b55 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -59,10 +59,10 @@ protected override async Task CreateConsumers() var responseConsumerCreated = false; - void AddGroupConsumer(string group, IReadOnlyCollection topics, Func processorFactory) + void AddGroupConsumer(IEnumerable consumerSettings, string group, IReadOnlyCollection topics, Func processorFactory) { _logger.LogInformation("Creating consumer group {ConsumerGroup}", group); - AddConsumer(new KafkaGroupConsumer(this, group, topics, processorFactory)); + AddConsumer(new KafkaGroupConsumer(consumerSettings, this, group, topics, processorFactory)); } IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommitController cc) => new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, this, HeaderSerializer); @@ -90,12 +90,12 @@ void AddGroupConsumer(string group, IReadOnlyCollection topics, Func Serializer.Deserialize(messageType, transportMessage.PayloadSegment.Array); - void AddTopicConsumer(string topic, IMessageProcessor messageProcessor) + void AddTopicConsumer(IEnumerable consumerSettings, string topic, IMessageProcessor messageProcessor) { _logger.LogInformation("Creating consumer for {Path}", topic); - var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger(), topic, messageProcessor); + var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger(), consumerSettings, topic, messageProcessor); AddConsumer(consumer); } foreach (var (path, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList())) { var processor = new MessageProcessor(consumerSettings, this, MessageProvider, path, responseProducer: this); - AddTopicConsumer(path, processor); + AddTopicConsumer(consumerSettings, path, processor); } if (Settings.RequestResponse != null) { var processor = new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, responseConsumer: this, messagePayloadProvider: m => m.PayloadSegment.Array); - AddTopicConsumer(Settings.RequestResponse.Path, processor); + AddTopicConsumer([Settings.RequestResponse], Settings.RequestResponse.Path, processor); } var topics = Consumers.Cast().Select(x => new MqttTopicFilterBuilder().WithTopic(x.Topic).Build()).ToList(); diff --git a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs index b3729f69..ede01651 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs @@ -5,7 +5,8 @@ public class MqttTopicConsumer : AbstractConsumer public IMessageProcessor MessageProcessor; public string Topic { get; } - public MqttTopicConsumer(ILogger logger, string topic, IMessageProcessor messageProcessor) : base(logger) + public MqttTopicConsumer(ILogger logger, IEnumerable consumerSettings, string topic, IMessageProcessor messageProcessor) + : base(logger, consumerSettings) { Topic = topic; MessageProcessor = messageProcessor; diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs index 3100dbaa..9219a2c7 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs @@ -13,8 +13,8 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer protected string QueueName { get; } protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; } - protected AbstractRabbitMqConsumer(ILogger logger, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter) - : base(logger) + protected AbstractRabbitMqConsumer(ILogger logger, IEnumerable consumerSettings, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter) + : base(logger, consumerSettings) { _channel = channel; _headerValueConverter = headerValueConverter; diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index 2040822c..bb557cbe 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -12,7 +12,7 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode; public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) + : base(loggerFactory.CreateLogger(), consumers, channel, queueName, headerValueConverter) { _acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null) ?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index a40bc40e..cd152d51 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -7,7 +7,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) + : base(loggerFactory.CreateLogger(), [requestResponseSettings], channel, queueName, headerValueConverter) { _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray()); } diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs index f178cd80..8105bb1b 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs @@ -24,7 +24,7 @@ public QueueProcessors(string name, List> } public RedisListCheckerConsumer(ILogger logger, IDatabase database, TimeSpan? pollDelay, TimeSpan maxIdle, IEnumerable<(string QueueName, IMessageProcessor Processor)> queues, IMessageSerializer envelopeSerializer) - : base(logger) + : base(logger, []) { _database = database; _pollDelay = pollDelay; diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs index 77204dff..ad79f383 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs @@ -9,8 +9,8 @@ public class RedisTopicConsumer : AbstractConsumer, IRedisConsumer public string Path { get; } - public RedisTopicConsumer(ILogger logger, string topic, ISubscriber subscriber, IMessageProcessor messageProcessor, IMessageSerializer envelopeSerializer) - : base(logger) + public RedisTopicConsumer(ILogger logger, IEnumerable consumerSettings, string topic, ISubscriber subscriber, IMessageProcessor messageProcessor, IMessageSerializer envelopeSerializer) + : base(logger, consumerSettings) { Path = topic; _messageProcessor = messageProcessor; diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 276f7365..a88f36ae 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -90,9 +90,9 @@ protected override async Task CreateConsumers() object MessageProvider(Type messageType, MessageWithHeaders transportMessage) => Serializer.Deserialize(messageType, transportMessage.Payload); - void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor messageProcessor) + void AddTopicConsumer(IEnumerable consumerSettings, string topic, ISubscriber subscriber, IMessageProcessor messageProcessor) { - var consumer = new RedisTopicConsumer(LoggerFactory.CreateLogger(), topic, subscriber, messageProcessor, ProviderSettings.EnvelopeSerializer); + var consumer = new RedisTopicConsumer(LoggerFactory.CreateLogger(), consumerSettings, topic, subscriber, messageProcessor, ProviderSettings.EnvelopeSerializer); AddConsumer(consumer); } @@ -116,7 +116,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor(LoggerFactory, Settings.RequestResponse, this, messagePayloadProvider: m => m.Payload)); + AddTopicConsumer([Settings.RequestResponse], Settings.RequestResponse.Path, subscriber, new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, this, messagePayloadProvider: m => m.Payload)); } else { diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs index 25a4ef32..333901d6 100644 --- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs +++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs @@ -2,66 +2,122 @@ public abstract class AbstractConsumer : IAsyncDisposable, IConsumerControl { + private readonly SemaphoreSlim _semaphore; + private readonly List _circuitBreakers; + private CancellationTokenSource _cancellationTokenSource; private bool _starting; private bool _stopping; - protected ILogger Logger { get; } - + public bool IsPaused { get; private set; } public bool IsStarted { get; private set; } - + protected ILogger Logger { get; } + protected IReadOnlyList Settings { get; } protected CancellationToken CancellationToken => _cancellationTokenSource.Token; - public AbstractConsumer(ILogger logger) + protected AbstractConsumer(ILogger logger, IEnumerable consumerSettings) { + _semaphore = new(1, 1); + _circuitBreakers = []; + Logger = logger; + Settings = consumerSettings.ToList(); } public async Task Start() { + async Task StartCircuitBreakers() + { + var types = Settings.SelectMany(x => x.CircuitBreakers).Distinct(); + if (!types.Any()) + { + return; + } + + var sp = Settings.Select(x => x.MessageBusSettings.ServiceProvider).FirstOrDefault(x => x != null); + foreach (var type in types.Distinct()) + { + var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, type, Settings); + _circuitBreakers.Add(breaker); + await breaker.Subscribe(BreakerChanged); + } + } + if (IsStarted || _starting) { return; } + await _semaphore.WaitAsync(); _starting = true; try { - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + if (_cancellationTokenSource?.IsCancellationRequested != false) { _cancellationTokenSource?.Cancel(); _cancellationTokenSource = new CancellationTokenSource(); } - await OnStart().ConfigureAwait(false); + await StartCircuitBreakers(); + IsPaused = _circuitBreakers.Exists(x => x.State == Circuit.Closed); + if (!IsPaused) + { + await OnStart().ConfigureAwait(false); + } IsStarted = true; } finally { _starting = false; + _semaphore.Release(); } } public async Task Stop() { + async Task StopCircuitBreakers() + { + foreach (var breaker in _circuitBreakers) + { + breaker.Unsubscribe(); + + if (breaker is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync(); + } + else if (breaker is IDisposable disposable) + { + disposable.Dispose(); + } + } + + _circuitBreakers.Clear(); + } + if (!IsStarted || _stopping) { return; } + await _semaphore.WaitAsync(); _stopping = true; try { _cancellationTokenSource.Cancel(); - await OnStop().ConfigureAwait(false); + await StopCircuitBreakers(); + if (!IsPaused) + { + await OnStop().ConfigureAwait(false); + } IsStarted = false; } finally { _stopping = false; + _semaphore.Release(); } } @@ -85,4 +141,40 @@ protected async virtual ValueTask DisposeAsyncCore() } #endregion + + async internal Task BreakerChanged(Circuit state) + { + await _semaphore.WaitAsync(); + try + { + if (!IsStarted) + { + return; + } + + var shouldPause = state == Circuit.Closed || _circuitBreakers.Exists(x => x.State == Circuit.Closed); + if (shouldPause != IsPaused) + { + var settings = Settings.Count > 0 ? Settings[0] : null; + var path = settings?.Path ?? "[unknown path]"; + var bus = settings?.MessageBusSettings?.Name ?? "default"; + if (shouldPause) + { + Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); + await OnStop().ConfigureAwait(false); + } + else + { + Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); + await OnStart().ConfigureAwait(false); + } + + IsPaused = shouldPause; + } + } + finally + { + _semaphore.Release(); + } + } } diff --git a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs index 5db30fe7..7f1cf0fd 100644 --- a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs +++ b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs @@ -25,7 +25,7 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi configure(mbb); // Execute post config actions for the master bus and its children - foreach (var action in mbb.PostConfigurationActions.Concat(mbb.Children.Values.SelectMany(x => x.PostConfigurationActions))) + foreach (var action in mbb.GetPostConfigurationActions()) { action(services); } diff --git a/src/SlimMessageBus.Host/IConsumerControl.cs b/src/SlimMessageBus.Host/IConsumerControl.cs index 451ccd19..eee96a7f 100644 --- a/src/SlimMessageBus.Host/IConsumerControl.cs +++ b/src/SlimMessageBus.Host/IConsumerControl.cs @@ -9,7 +9,7 @@ public interface IConsumerControl Task Start(); /// - /// Indicated whether the consumers are started. + /// Indicates whether the consumers are started. /// bool IsStarted { get; } diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index cae29cc5..eef771aa 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -260,7 +260,15 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{1A71BB05 ..\build\tasks.ps1 = ..\build\tasks.ps1 EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SecretStore.Test", "Tests\SecretStore.Test\SecretStore.Test.csproj", "{969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecretStore.Test", "Tests\SecretStore.Test\SecretStore.Test.csproj", "{969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "CircuitBreakers", "CircuitBreakers", "{FE36338C-0DA2-499E-92CA-F9D5CE594B9F}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.CircuitBreaker.HealthCheck", "SlimMessageBus.Host.CircuitBreaker.HealthCheck\SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj", "{B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test", "Tests\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj", "{CA02D82E-DACC-4AB5-BD6B-071341E9E664}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.CircuitBreaker.HealthCheck", "Samples\Sample.CircuitBreaker.HealthCheck\Sample.CircuitBreaker.HealthCheck.csproj", "{226FC4F3-01EF-4214-9566-942CA0FB71B0}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -790,6 +798,30 @@ Global {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|Any CPU.Build.0 = Release|Any CPU {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|x86.ActiveCfg = Release|Any CPU {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|x86.Build.0 = Release|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Debug|x86.ActiveCfg = Debug|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Debug|x86.Build.0 = Debug|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Release|Any CPU.Build.0 = Release|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Release|x86.ActiveCfg = Release|Any CPU + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8}.Release|x86.Build.0 = Release|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Debug|x86.ActiveCfg = Debug|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Debug|x86.Build.0 = Debug|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Release|Any CPU.Build.0 = Release|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Release|x86.ActiveCfg = Release|Any CPU + {CA02D82E-DACC-4AB5-BD6B-071341E9E664}.Release|x86.Build.0 = Release|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Debug|x86.ActiveCfg = Debug|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Debug|x86.Build.0 = Debug|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Release|Any CPU.Build.0 = Release|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Release|x86.ActiveCfg = Release|Any CPU + {226FC4F3-01EF-4214-9566-942CA0FB71B0}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -867,6 +899,10 @@ Global {DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {AD05234E-A925-44C0-977E-FEAC2A75B98C} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9} = {D3D6FD9A-968A-45BB-86C7-4527C72A057E} + {FE36338C-0DA2-499E-92CA-F9D5CE594B9F} = {75BDDBB5-8DB8-4893-BD89-8FFC6C42244D} + {B71D4F74-B1D9-47A8-8DC1-C7D6A56DC6A8} = {FE36338C-0DA2-499E-92CA-F9D5CE594B9F} + {CA02D82E-DACC-4AB5-BD6B-071341E9E664} = {9F005B5C-A856-4351-8C0C-47A8B785C637} + {226FC4F3-01EF-4214-9566-942CA0FB71B0} = {A5B15524-93B8-4CCE-AC6D-A22984498BA0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80} diff --git a/src/SlimMessageBus/IConsumerCircuitBreaker.cs b/src/SlimMessageBus/IConsumerCircuitBreaker.cs new file mode 100644 index 00000000..27154566 --- /dev/null +++ b/src/SlimMessageBus/IConsumerCircuitBreaker.cs @@ -0,0 +1,17 @@ +namespace SlimMessageBus; + +/// +/// Circuit breaker to toggle consumer status on an external event. +/// +public interface IConsumerCircuitBreaker +{ + Circuit State { get; } + Task Subscribe(Func onChange); + void Unsubscribe(); +} + +public enum Circuit +{ + Open, + Closed +} diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs new file mode 100644 index 00000000..9434d91d --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs @@ -0,0 +1,12 @@ +global using System; +global using System.Collections.Generic; +global using System.Threading.Tasks; + +global using FluentAssertions; + +global using Microsoft.Extensions.Diagnostics.HealthChecks; +global using Microsoft.Extensions.Logging.Abstractions; + +global using Moq; + +global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs new file mode 100644 index 00000000..35986e86 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs @@ -0,0 +1,323 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; + +public static class HealthCheckBackgroundServiceTests +{ + public class AreEqualTests + { + [Fact] + public void AreEqual_ShouldReturnTrue_WhenBothDictionariesAreEmpty() + { + // Arrange + var dict1 = new Dictionary(); + var dict2 = new Dictionary(); + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeTrue(); + } + + [Fact] + public void AreEqual_ShouldReturnFalse_WhenDictionariesHaveDifferentCounts() + { + // Arrange + var dict1 = new Dictionary { { "key1", 1 } }; + var dict2 = new Dictionary(); + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void AreEqual_ShouldReturnFalse_WhenDictionariesHaveDifferentKeys() + { + // Arrange + var dict1 = new Dictionary { { "key1", 1 } }; + var dict2 = new Dictionary { { "key2", 1 } }; + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void AreEqual_ShouldReturnFalse_WhenDictionariesHaveDifferentValues() + { + // Arrange + var dict1 = new Dictionary { { "key1", 1 } }; + var dict2 = new Dictionary { { "key1", 2 } }; + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void AreEqual_ShouldReturnTrue_WhenDictionariesHaveSameKeysAndValues() + { + // Arrange + var dict1 = new Dictionary { { "key1", 1 } }; + var dict2 = new Dictionary { { "key1", 1 } }; + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeTrue(); + } + + [Fact] + public void AreEqual_ShouldReturnTrue_WhenDictionariesAreComplexButEqual() + { + // Arrange + var dict1 = new Dictionary + { + { "key1", 1 }, + { "key2", 2 }, + { "key3", 3 } + }; + var dict2 = new Dictionary + { + { "key1", 1 }, + { "key2", 2 }, + { "key3", 3 } + }; + + // Act + var result = HealthCheckBackgroundService.AreEqual(dict1, dict2); + + // Assert + result.Should().BeTrue(); + } + } + + public class PublishAsyncTests + { + [Fact] + public async Task PublishAsync_ShouldUpdateEntries_WhenServiceIsActive() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, tags: ["tag1"]) }, + { "check2", new HealthReportEntry(HealthStatus.Degraded, "Degraded", TimeSpan.Zero, null, null, tags: ["tag2"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + // Act + await target.PublishAsync(report, CancellationToken.None); + + // Assert + var expectedTagStatus = new Dictionary + { + { "tag1", HealthStatus.Healthy }, + { "tag2", HealthStatus.Degraded } + }; + + target.TagStatus.Should().BeEquivalentTo(expectedTagStatus); + } + + [Fact] + public async Task PublishAsync_ShouldSetTagStatusToUnhealthy_WhenConflictingHealthStatusOccurs() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["sharedTag"]) }, + { "check2", new HealthReportEntry(HealthStatus.Unhealthy, "Unhealthy", TimeSpan.Zero, null, null, ["sharedTag"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + // Act + await target.PublishAsync(report, CancellationToken.None); + + // Assert + var tagStatus = target.TagStatus; + tagStatus.Should().ContainKey("sharedTag").WhoseValue.Should().Be(HealthStatus.Unhealthy); + } + + [Fact] + public async Task PublishAsync_ShouldSetTagStatusToUnhealthy_WhenOneIsDegradedAndOtherIsUnHealthy() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Degraded, "Degraded", TimeSpan.Zero, null, null, ["sharedTag"]) }, + { "check2", new HealthReportEntry(HealthStatus.Unhealthy, "UnHealthy", TimeSpan.Zero, null, null, ["sharedTag"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + // Act + await target.PublishAsync(report, CancellationToken.None); + + // Assert + var tagStatus = target.TagStatus; + tagStatus.Should().ContainKey("sharedTag").WhoseValue.Should().Be(HealthStatus.Unhealthy); + } + + [Fact] + public async Task PublishAsync_ShouldSetTagStatusToDegraded_WhenOneIsDegradedAndOtherIsHealthy_AndNoUnhealthyExists() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Degraded, "Degraded", TimeSpan.Zero, null, null, ["sharedTag"]) }, + { "check2", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["sharedTag"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + // Act + await target.PublishAsync(report, CancellationToken.None); + + // Assert + var tagStatus = target.TagStatus; + tagStatus.Should().ContainKey("sharedTag").WhoseValue.Should().Be(HealthStatus.Degraded); + } + + [Fact] + public async Task PublishAsync_ShouldNotChangeTagStatus_WhenConflictingHealthStatusIsSame() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["sharedTag"]) }, + { "check2", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["sharedTag"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + // Act + await target.PublishAsync(report, CancellationToken.None); + + // Assert + var tagStatus = target.TagStatus; + tagStatus.Should().ContainKey("sharedTag").WhoseValue.Should().Be(HealthStatus.Healthy); + } + + [Fact] + public async Task PublishAsync_ShouldNotCallDelegates_IfTagStatusHasNotChanged() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var delegateCalled = false; + await target.Subscribe( + _ => + { + delegateCalled = true; + return Task.CompletedTask; + }); + + var initialHealthReport = new HealthReport( + new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["tag1"]) } + }, + TimeSpan.Zero); + + await target.PublishAsync(initialHealthReport, CancellationToken.None); + delegateCalled = false; + + var unchangedHealthReport = new HealthReport( + new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["tag1"]) }, + { "check2", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["tag1"]) } + }, + TimeSpan.Zero); + + // Act + await target.PublishAsync(unchangedHealthReport, CancellationToken.None); + + // Assert + delegateCalled.Should().BeFalse(); + } + } + + public class SubscribeTests + { + [Fact] + public async Task Subscribe_ShouldInvokeDelegateImmediatelyWithCurrentStatus() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + await target.StartAsync(CancellationToken.None); + + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["tag1"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + await target.PublishAsync(report, CancellationToken.None); + + IReadOnlyDictionary? capturedStatus = null; + Task OnChange(IReadOnlyDictionary status) + { + capturedStatus = status; + return Task.CompletedTask; + } + + // Act + await target.Subscribe(OnChange); + + // Assert + capturedStatus.Should().NotBeNull(); + capturedStatus.Should().ContainKey("tag1").WhoseValue.Should().Be(HealthStatus.Healthy); + } + } + + public class UnsubscribeTests + { + [Fact] + public async Task Unsubscribe_ShouldRemoveDelegate() + { + // Arrange + using var target = new HealthCheckBackgroundService(); + var entries = new Dictionary + { + { "check1", new HealthReportEntry(HealthStatus.Healthy, "Healthy", TimeSpan.Zero, null, null, ["tag1"]) } + }; + var report = new HealthReport(entries, TimeSpan.Zero); + + IReadOnlyDictionary? capturedStatus = null; + Task OnChange(IReadOnlyDictionary status) + { + capturedStatus = status; + return Task.CompletedTask; + } + + // Act + await target.Subscribe(OnChange); + capturedStatus = null; + + target.Unsubscribe(OnChange); + await target.PublishAsync(report, CancellationToken.None); + + // Assert + capturedStatus.Should().BeNull(); + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs new file mode 100644 index 00000000..48499879 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs @@ -0,0 +1,184 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; +public class HealthCheckCircuitBreakerTests +{ + private readonly Mock _hostMock; + private readonly HealthCheckCircuitBreaker _circuitBreaker; + private readonly TestConsumerSettings _testConsumerSettings; + private readonly TestConsumerSettings _testConsumerSettings2; + + public HealthCheckCircuitBreakerTests() + { + _testConsumerSettings = new TestConsumerSettings(); + _testConsumerSettings2 = new TestConsumerSettings(); + + _hostMock = new Mock(); + var _settings = new List + { + _testConsumerSettings, + _testConsumerSettings2 + }; + + _circuitBreaker = new HealthCheckCircuitBreaker( + _settings, + _hostMock.Object); + } + + [Fact] + public void Constructor_ShouldInitializeOpenState() + { + // assert + _circuitBreaker.State.Should().Be(Circuit.Open); + } + + [Fact] + public async Task Subscribe_ShouldSetOnChangeAndSubscribeToHost() + { + // arrange + static Task onChange(Circuit _) => Task.CompletedTask; + + // act + await _circuitBreaker.Subscribe(onChange); + + // assert + _hostMock.Verify(h => h.Subscribe(It.IsAny()), Times.Once); + _circuitBreaker.State.Should().Be(Circuit.Open); + } + + [Fact] + public async Task Subscribe_ShouldMergeTagsFromAllSettings() + { + const string degradedTag = "degraded"; + const string unhealthyTag = "unhealthy"; + + _testConsumerSettings.PauseOnUnhealthy(unhealthyTag); + _testConsumerSettings2.PauseOnDegraded(degradedTag); + + var expected = new Dictionary() + { + { unhealthyTag, HealthStatus.Unhealthy }, + { degradedTag, HealthStatus.Degraded } + }; + + // arrange + static Task onChange(Circuit _) => Task.CompletedTask; + await _circuitBreaker.Subscribe(onChange); + + // act + var actual = typeof(HealthCheckCircuitBreaker) + .GetField("_monitoredTags", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance) + ?.GetValue(_circuitBreaker) as IDictionary; + + // assert + actual.Should().NotBeNull(); + actual.Should().BeEquivalentTo(expected); + } + + [Fact] + public async Task Subscribe_MergedTagsOfDifferentSeverity_ShouldUseLeastSevereCondition() + { + const string tag = "tag"; + _testConsumerSettings.PauseOnUnhealthy(tag); + _testConsumerSettings2.PauseOnDegraded(tag); + + var expected = new Dictionary() + { + { tag, HealthStatus.Degraded }, + }; + + // arrange + static Task onChange(Circuit _) => Task.CompletedTask; + await _circuitBreaker.Subscribe(onChange); + + // act + var actual = typeof(HealthCheckCircuitBreaker) + .GetField("_monitoredTags", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance) + ?.GetValue(_circuitBreaker) as IDictionary; + + // assert + actual.Should().NotBeNull(); + actual.Should().BeEquivalentTo(expected); + } + + [Fact] + public async Task TagStatusChanged_ShouldChangeStateToClosed_WhenUnhealthyTagIsUnhealthy() + { + // arrange + const string tag = "tag"; + _testConsumerSettings.PauseOnUnhealthy(tag); + + static Task onChange(Circuit _) => Task.CompletedTask; + await _circuitBreaker.Subscribe(onChange); + + var tags = new Dictionary + { + { tag, HealthStatus.Unhealthy } + }; + + // act + await _circuitBreaker.TagStatusChanged(tags); + + // assert + _circuitBreaker.State.Should().Be(Circuit.Closed); + } + + [Fact] + public async Task TagStatusChanged_ShouldChangeStateToClosed_WhenDegradedTagIsUnhealthy() + { + // arrange + const string tag = "tag"; + _testConsumerSettings.PauseOnDegraded(tag); + + static Task onChange(Circuit _) => Task.CompletedTask; + await _circuitBreaker.Subscribe(onChange); + + var tags = new Dictionary + { + { tag, HealthStatus.Unhealthy } + }; + + // act + await _circuitBreaker.TagStatusChanged(tags); + + // assert + _circuitBreaker.State.Should().Be(Circuit.Closed); + } + + [Fact] + public async Task TagStatusChanged_ShouldRemainOpen_WhenUnmonitoredTagsAreUnhealthyOrDegraded() + { + // arrange + _testConsumerSettings.PauseOnUnhealthy("tag1", "tag2"); + + Func onChange = _ => Task.CompletedTask; + await _circuitBreaker.Subscribe(onChange); + + var tags = new Dictionary + { + { "tag1", HealthStatus.Healthy }, + { "tag2", HealthStatus.Degraded }, + { "unmonitored1", HealthStatus.Unhealthy }, + { "unmonitored2", HealthStatus.Degraded } + }; + + // act + await _circuitBreaker.TagStatusChanged(tags); + + // assert + _circuitBreaker.State.Should().Be(Circuit.Open); + } + + [Fact] + public void Unsubscribe_ShouldUnsubscribeFromHostAndClearOnChange() + { + // act + _circuitBreaker.Unsubscribe(); + + // assert + _hostMock.Verify(h => h.Unsubscribe(It.IsAny()), Times.Once); + _circuitBreaker.State.Should().Be(Circuit.Open); + } + + public class TestConsumerSettings : AbstractConsumerSettings + { + } +} diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj new file mode 100644 index 00000000..909dcfbe --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj @@ -0,0 +1,23 @@ + + + + + + enable + + + + + + + + + + + + + PreserveNewest + + + + \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/xunit.runner.json b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/xunit.runner.json new file mode 100644 index 00000000..4e80a853 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/xunit.runner.json @@ -0,0 +1,4 @@ +{ + "diagnosticMessages": true, + "longRunningTestSeconds": 120 +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Configuration.Test/TypeCollectionTests.cs b/src/Tests/SlimMessageBus.Host.Configuration.Test/TypeCollectionTests.cs new file mode 100644 index 00000000..0a8ff343 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Configuration.Test/TypeCollectionTests.cs @@ -0,0 +1,217 @@ +namespace SlimMessageBus.Host.Configuration.Test; + +public class TypeCollectionTests +{ + [Fact] + public void Add_Should_AddTypeToCollection_IfAssignableToGeneric() + { + // Arrange + var collection = new TypeCollection(); + + // Act + collection.Add(typeof(SampleClass)); + + // Assert + collection.Count.Should().Be(1); + collection.Should().Contain(typeof(SampleClass)); + } + + [Fact] + public void Add_Should_ThrowException_IfNotAssignableToGeneric() + { + // Arrange + var collection = new TypeCollection(); + + // Act + var act = () => collection.Add(typeof(object)); + + // Assert + act.Should().Throw().WithMessage($"Type is not assignable to '{typeof(ISampleInterface)}'. (Parameter 'type')"); + } + + [Fact] + public void Add_Should_ThrowException_WhenTypeIsAssignableToGenericButAlreadyExists() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + Action act = () => collection.Add(typeof(SampleClass)); + + // Assert + act.Should().Throw().WithMessage("Type already exists in the collection. (Parameter 'type')"); + } + + [Fact] + public void Add_Should_AddTypeToCollection() + { + // Arrange + var collection = new TypeCollection(); + + // Act + collection.Add(); + + // Assert + collection.Count.Should().Be(1); + collection.Should().Contain(typeof(SampleClass)); + } + + [Fact] + public void Add_Should_ThrowException_WhenTypeAlreadyExists() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + Action act = () => collection.Add(); + + // Assert + act.Should().Throw().WithMessage("Type already exists in the collection. (Parameter 'type')"); + } + + [Fact] + public void TryAdd_Should_AddTypeToCollection() + { + // Arrange + var collection = new TypeCollection(); + + // Act + var result = collection.TryAdd(); + + // Assert + result.Should().BeTrue(); + collection.Count.Should().Be(1); + collection.Should().Contain(typeof(SampleClass)); + } + + [Fact] + public void TryAdd_Should_ReturnFalse_WhenTypeAlreadyExists() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + var result = collection.TryAdd(); + + // Assert + result.Should().BeFalse(); + collection.Count.Should().Be(1); + } + + [Fact] + public void Clear_Should_RemoveAllTypesFromCollection() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + collection.Add(); + + // Act + collection.Clear(); + + // Assert + collection.Count.Should().Be(0); + collection.Should().NotContain(typeof(SampleClass)); + collection.Should().NotContain(typeof(AnotherSampleClass)); + } + + [Fact] + public void Contains_Should_ReturnTrue_WhenTypeExistsInCollection() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + var contains = collection.Contains(); + + // Assert + contains.Should().BeTrue(); + } + + [Fact] + public void Contains_Should_ReturnFalse_WhenTypeDoesNotExistInCollection() + { + // Arrange + var collection = new TypeCollection(); + + // Act + var contains = collection.Contains(); + + // Assert + contains.Should().BeFalse(); + } + + [Fact] + public void Remove_Should_RemoveTypeFromCollection_WhenSuppliedAsGenericParameter() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + var removed = collection.Remove(); + + // Assert + removed.Should().BeTrue(); + collection.Count.Should().Be(0); + collection.Should().NotContain(typeof(SampleClass)); + } + + [Fact] + public void Remove_Should_RemoveTypeFromCollection_WhenSuppliedAsType() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + + // Act + var removed = collection.Remove(typeof(SampleClass)); + + // Assert + removed.Should().BeTrue(); + collection.Count.Should().Be(0); + collection.Should().NotContain(typeof(SampleClass)); + } + + [Fact] + public void Remove_Should_ReturnFalse_WhenTypeDoesNotExistInCollection() + { + // Arrange + var collection = new TypeCollection(); + + // Act + var removed = collection.Remove(); + + // Assert + removed.Should().BeFalse(); + } + + [Fact] + public void Enumerator_Should_IterateOverCollection() + { + // Arrange + var collection = new TypeCollection(); + collection.Add(); + collection.Add(); + + // Act + var types = new List(); + foreach (var type in collection) + { + types.Add(type); + } + + // Assert + types.Should().ContainInOrder(typeof(SampleClass), typeof(AnotherSampleClass)); + } + + public interface ISampleInterface { } + + public class SampleClass : ISampleInterface { } + + public class AnotherSampleClass : ISampleInterface { } +} diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs index 08f616df..c548124b 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs @@ -5,7 +5,7 @@ public class ConcurrentMessageProcessorQueueTests { [Fact] - public async void When_Enqueue_Given_FourMessagesEnqueued_Then_ProcessMessageIsCalledOnFirstTwoThenTwoAfterThat() + public async Task When_Enqueue_Given_FourMessagesEnqueued_Then_ProcessMessageIsCalledOnFirstTwoThenTwoAfterThat() { // Arrange var messageProcessor = new Mock>(); diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs index 909fe235..05fa159f 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs @@ -5,7 +5,7 @@ public class MessageProcessorQueueTests { [Fact] - public async void When_Enqueue_Given_TwoMessagesEnqueued_Then_ProcessMessageIsCalledOn1stMessageAndOn2ndAfterThat() + public async Task When_Enqueue_Given_TwoMessagesEnqueued_Then_ProcessMessageIsCalledOn1stMessageAndOn2ndAfterThat() { // Arrange var messageProcessor = new Mock>(); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs index ed55caa5..18f2ce1e 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs @@ -240,7 +240,7 @@ public record GenerateCustomerIdCommand(string Firstname, string Lastname) : IRe public class GenerateCustomerIdCommandHandler : IRequestHandler { - public async Task OnHandle(GenerateCustomerIdCommand request) + public Task OnHandle(GenerateCustomerIdCommand request) { // Note: This handler will be already wrapped in a transaction: see Program.cs and .UseTransactionScope() / .UseSqlTransaction() @@ -250,7 +250,7 @@ public async Task OnHandle(GenerateCustomerIdCommand request) } // generate a dummy customer id - return $"{request.Firstname.ToUpperInvariant()[..3]}-{request.Lastname.ToUpperInvariant()[..3]}-{Guid.NewGuid()}"; + return Task.FromResult($"{request.Firstname.ToUpperInvariant()[..3]}-{request.Lastname.ToUpperInvariant()[..3]}-{Guid.NewGuid()}"); } } diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs index ec62d3b5..223613b8 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs @@ -181,7 +181,7 @@ private async Task BasicPubSub(int expectedMessageCopies, Action addit [InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing)] public async Task BasicReqRespOnTopic(RabbitMqMessageAcknowledgementMode acknowledgementMode) { - var topic = "test-echo"; + const string topic = "test-echo"; AddBusConfiguration(mbb => { diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs new file mode 100644 index 00000000..5d49be44 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs @@ -0,0 +1,139 @@ +namespace SlimMessageBus.Host.Test.Consumer; + +public class AbstractConsumerTests +{ + private class TestConsumer : AbstractConsumer + { + public TestConsumer(ILogger logger, IEnumerable settings) + : base(logger, settings) { } + + protected override Task OnStart() => Task.CompletedTask; + protected override Task OnStop() => Task.CompletedTask; + } + + private class TestConsumerSettings : AbstractConsumerSettings; + + public class CircuitBreakerAccessor + { + public Circuit State { get; set; } + public int SubscribeCallCount { get; set; } = 0; + public int UnsubscribeCallCount { get; set; } = 0; + public IEnumerable Settings { get; set; } + public Func OnChange { get; set; } + } + + private class TestCircuitBreaker : IConsumerCircuitBreaker + { + private readonly CircuitBreakerAccessor _accessor; + + public TestCircuitBreaker(CircuitBreakerAccessor accessor, IEnumerable settings) + { + _accessor = accessor; + Settings = settings; + State = Circuit.Open; + } + + public Circuit State + { + get => _accessor.State; + set => _accessor.State = value; + } + public IEnumerable Settings { get; } + + public Task Subscribe(Func onChange) + { + _accessor.SubscribeCallCount++; + _accessor.OnChange = onChange; + + return Task.CompletedTask; + } + + public void Unsubscribe() + { + _accessor.UnsubscribeCallCount++; + } + } + + private readonly List _settings; + private readonly TestConsumer _target; + private readonly CircuitBreakerAccessor accessor; + + public AbstractConsumerTests() + { + accessor = new CircuitBreakerAccessor(); + + var serviceCollection = new ServiceCollection(); + serviceCollection.TryAddSingleton(accessor); + serviceCollection.TryAddTransient(); + + var testSettings = new TestConsumerSettings + { + MessageBusSettings = new MessageBusSettings { ServiceProvider = serviceCollection.BuildServiceProvider() } + }; + + testSettings.CircuitBreakers.Add(); + + _settings = [testSettings]; + + _target = new TestConsumer(NullLogger.Instance, _settings); + } + + [Fact] + public async Task Start_ShouldStartCircuitBreakers_WhenNotStarted() + { + // Arrange + + // Act + await _target.Start(); + + // Assert + _target.IsStarted.Should().BeTrue(); + accessor.SubscribeCallCount.Should().Be(1); + } + + [Fact] + public async Task Stop_ShouldStopCircuitBreakers_WhenStarted() + { + // Arrange + await _target.Start(); + + // Act + await _target.Stop(); + + // Assert + _target.IsStarted.Should().BeFalse(); + accessor.UnsubscribeCallCount.Should().Be(1); + } + + [Fact] + public async Task BreakerChanged_ShouldPauseConsumer_WhenBreakerClosed() + { + // Arrange + await _target.Start(); + + // Act + _target.IsPaused.Should().BeFalse(); + accessor.State = Circuit.Closed; + await _target.BreakerChanged(Circuit.Closed); + + // Assert + _target.IsPaused.Should().BeTrue(); + } + + [Fact] + public async Task BreakerChanged_ShouldResumeConsumer_WhenBreakerOpen() + { + // Arrange + await _target.Start(); + accessor.State = Circuit.Closed; + await _target.BreakerChanged(Circuit.Open); + + // Act + _target.IsPaused.Should().BeTrue(); + accessor.State = Circuit.Open; + await _target.BreakerChanged(Circuit.Open); + + // Assert + _target.IsPaused.Should().BeFalse(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.Test/GlobalUsings.cs index a8f3ecbb..af20b140 100644 --- a/src/Tests/SlimMessageBus.Host.Test/GlobalUsings.cs +++ b/src/Tests/SlimMessageBus.Host.Test/GlobalUsings.cs @@ -1,8 +1,11 @@ -global using AutoFixture; +global using System; + +global using AutoFixture; global using FluentAssertions; global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Abstractions; @@ -13,3 +16,4 @@ global using SlimMessageBus.Host.Serialization.Json; global using Xunit; + diff --git a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs index 06019d7b..af7b522d 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs @@ -19,13 +19,13 @@ public void When_GenerateGetterFunc_Given_TaskOfT_Then_ResultOfTaskIsObtained() } [Fact] - public async void When_GenerateMethodCallToFunc_Given_ConsumerWithOnHandlerAsyncMethodWithTwoArguments_Then_MethodIsProperlyInvoked() + public async Task When_GenerateMethodCallToFunc_Given_ConsumerWithOnHandlerAsyncMethodWithTwoArguments_Then_MethodIsProperlyInvoked() { // arrange var message = new SomeMessage(); var instanceType = typeof(IConsumer); - var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer.OnHandle), new[] { typeof(SomeMessage) }); + var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer.OnHandle), [typeof(SomeMessage)]); var consumerMock = new Mock>(); consumerMock.Setup(x => x.OnHandle(message)).Returns(Task.CompletedTask); @@ -46,14 +46,14 @@ internal record ClassWithGenericMethod(object Value) } [Fact] - public void When_GenerateGenericMethodCallToFunc_Given_GenericMethid_Then_MethodIsProperlyInvoked() + public void When_GenerateGenericMethodCallToFunc_Given_GenericMethod_Then_MethodIsProperlyInvoked() { // arrange var obj = new ClassWithGenericMethod(true); var genericMethod = typeof(ClassWithGenericMethod).GetMethods().FirstOrDefault(x => x.Name == nameof(ClassWithGenericMethod.GenericMethod)); // act - var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc>(genericMethod, new[] { typeof(bool) }, obj.GetType(), typeof(object)); + var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc>(genericMethod, [typeof(bool)], obj.GetType(), typeof(object)); var result = methodOfTypeBoolFunc(obj); // assert @@ -61,7 +61,7 @@ public void When_GenerateGenericMethodCallToFunc_Given_GenericMethid_Then_Method } [Fact] - public async void When_TaskOfObjectContinueWithTaskOfTypeFunc_Given_TaskOfObject_Then_TaskTypedIsObtained() + public async Task When_TaskOfObjectContinueWithTaskOfTypeFunc_Given_TaskOfObject_Then_TaskTypedIsObtained() { // arrange var taskOfObject = Task.FromResult(10);