Skip to content

Commit

Permalink
zarusz#251 Health check circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa committed Jul 17, 2024
1 parent fb9aabd commit d1865a5
Show file tree
Hide file tree
Showing 61 changed files with 1,920 additions and 58 deletions.
2 changes: 2 additions & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ $projects = @(
"SlimMessageBus.Host.Outbox.Sql",
"SlimMessageBus.Host.Outbox.DbContext",

"SlimMessageBus.Host.CircuitBreaker.HealthCheck",

"SlimMessageBus.Host.AsyncApi"
)

Expand Down
27 changes: 27 additions & 0 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer context (additional message information)](#consumer-context-additional-message-information)
- [Per-message DI container scope](#per-message-di-container-scope)
- [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse)
Expand Down Expand Up @@ -258,6 +259,32 @@ await consumerControl.Stop();

> Since version 1.15.5

#### Health check circuit breaker

Consumers can be associated with health check tags to enable or disable the consumer as the check's status changes.

```cs
// add health checks with tags
builder.Services
.AddHealthChecks()
.AddCheck<StorageHealthCheck>("Storage", tags: ["Storage"]);
.AddCheck<SqlServerHealthCheck>("SqlServer", tags: ["Sql"]);

builder.Services
.AddSlimMessageBus(mbb => {
...

mbb.Consume<Message>(cfg => {
...

// configure consumer to monitor tag/state
cfg.PauseOnUnhealthyCheck("Storage");
cfg.PauseOnDegradedHealthCheck("Sql");
})
})
```
*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck*

#### Consumer context (additional message information)

> Changed in version 1.15.0
Expand Down
27 changes: 27 additions & 0 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer context (additional message information)](#consumer-context-additional-message-information)
- [Per-message DI container scope](#per-message-di-container-scope)
- [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse)
Expand Down Expand Up @@ -258,6 +259,32 @@ await consumerControl.Stop();

> Since version 1.15.5

#### Health check circuit breaker

Consumers can be associated with health check tags to enable or disable the consumer as the check's status changes.

```cs
// add health checks with tags
builder.Services
.AddHealthChecks()
.AddCheck<StorageHealthCheck>("Storage", tags: ["Storage"]);
.AddCheck<SqlServerHealthCheck>("SqlServer", tags: ["Sql"]);

builder.Services
.AddSlimMessageBus(mbb => {
...

mbb.Consume<Message>(cfg => {
...

// configure consumer to monitor tag/state
cfg.PauseOnUnhealthyCheck("Storage");
cfg.PauseOnDegradedHealthCheck("Sql");
})
})
```
*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck*

#### Consumer context (additional message information)

> Changed in version 1.15.0
Expand Down
2 changes: 2 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_diagnostic.CA1859.severity = silent
# not supported by .netstandard2.0
dotnet_diagnostic.CA1510.severity = none

[*.{csproj,xml}]
indent_style = space
Expand Down
2 changes: 1 addition & 1 deletion src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1"
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class AddConsumer : IConsumer<Add>
{
private readonly ILogger<AddConsumer> _logger;

public AddConsumer(ILogger<AddConsumer> logger)
{
_logger = logger;
}

public Task OnHandle(Add message)
{
_logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class SubtractConsumer : IConsumer<Subtract>
{
private readonly ILogger<SubtractConsumer> _logger;

public SubtractConsumer(ILogger<SubtractConsumer> logger)
{
_logger = logger;
}

public Task OnHandle(Subtract message)
{
_logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b);
return Task.CompletedTask;
}
}
17 changes: 17 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
global using System.Net.Mime;
global using System.Reflection;

global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Hosting;
global using Microsoft.Extensions.Logging;

global using Sample.CircuitBreaker.HealthCheck.Consumers;
global using Sample.CircuitBreaker.HealthCheck.Models;

global using SecretStore;

global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.RabbitMQ;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class AddRandomHealthCheck : RandomHealthCheck
{
public AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Diagnostics.HealthChecks;

public abstract class RandomHealthCheck : IHealthCheck
{
private readonly ILogger _logger;

protected RandomHealthCheck(ILogger logger)
{
_logger = logger;
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
var value = (HealthStatus)Random.Shared.Next(3);
_logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value);
return Task.FromResult(new HealthCheckResult(value, value.ToString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class SubtractRandomHealthCheck : RandomHealthCheck
{
public SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace Sample.CircuitBreaker.HealthCheck;
public class IntermittentMessagePublisher : BackgroundService
{
private readonly ILogger _logger;
private readonly IMessageBus _messageBus;

public IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
{
_logger = logger;
_messageBus = messageBus;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var a = Random.Shared.Next(10);
var b = Random.Shared.Next(10);

//_logger.LogInformation("Emitting {A} +- {B} = ?", a, b);

await Task.WhenAll(
_messageBus.Publish(new Add(a, b)),
_messageBus.Publish(new Subtract(a, b)),
Task.Delay(1000, stoppingToken));
}
}
}
3 changes: 3 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Add(int a, int b);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Subtract(int a, int b);
91 changes: 91 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
namespace Sample.CircuitBreaker.HealthCheck;
using Microsoft.Extensions.Diagnostics.HealthChecks;

using Sample.CircuitBreaker.HealthCheck.HealthChecks;

using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;

public static class Program
{
private static async Task Main(string[] args)
{
// Local file with secrets
Secrets.Load(@"..\..\..\..\..\secrets.txt");

await Host.CreateDefaultBuilder(args)
.ConfigureServices((builder, services) =>
{
const string AddTag = "add";
const string SubtractTag = "subtract";

services.AddSlimMessageBus(mbb =>
{
var ticks = DateTimeOffset.UtcNow.Ticks;
var addTopic = $"Sample-CircuitBreaker-HealthCheck-add-{ticks}";
var subtractTopic = $"Sample-CircuitBreaker-HealthCheck-subtract-{ticks}";

mbb
.WithProviderRabbitMQ(
cfg =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(builder.Configuration.GetValue<string>("RabbitMQ:ConnectionString"));
cfg.ConnectionFactory.ClientProvidedName = $"Sample_CircuitBreaker_HealthCheck_{Environment.MachineName}";

cfg.UseMessagePropertiesModifier((m, p) => p.ContentType = MediaTypeNames.Application.Json);
cfg.UseExchangeDefaults(durable: false);
cfg.UseQueueDefaults(durable: false);
});
mbb
.Produce<Add>(x => x
.Exchange(addTopic, exchangeType: ExchangeType.Fanout, autoDelete: false)
.RoutingKeyProvider((m, p) => Guid.NewGuid().ToString()))
.Consume<Add>(
cfg =>
{
cfg
.Queue(nameof(Add), autoDelete: false)
.Path(nameof(Add))
.ExchangeBinding(addTopic)
.WithConsumer<AddConsumer>()
.PauseOnDegradedHealthCheck(AddTag);
});

mbb
.Produce<Subtract>(x => x
.Exchange(subtractTopic, exchangeType: ExchangeType.Fanout, autoDelete: false)
.RoutingKeyProvider((m, p) => Guid.NewGuid().ToString()))
.Consume<Subtract>(
cfg =>
{
cfg
.Queue(nameof(Subtract), autoDelete: false)
.Path(nameof(Subtract))
.ExchangeBinding(subtractTopic)
.WithConsumer<SubtractConsumer>()
.PauseOnUnhealthyCheck(SubtractTag);
});

mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
mbb.AddJsonSerializer();
});

services.AddHostedService<IntermittentMessagePublisher>();
services.AddSingleton<AddRandomHealthCheck>();
services.AddSingleton<SubtractRandomHealthCheck>();

services.Configure<HealthCheckPublisherOptions>(cfg =>
{
// aggressive to toggle health status often (sample only)
cfg.Delay = TimeSpan.FromSeconds(3);
cfg.Period = TimeSpan.FromSeconds(5);
});

services
.AddHealthChecks()
.AddCheck<AddRandomHealthCheck>("Add", tags: [AddTag])
.AddCheck<SubtractRandomHealthCheck>("Subtract", tags: [SubtractTag]);
})
.Build()
.RunAsync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\SlimMessageBus.Host.CircuitBreaker.HealthCheck\SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.RabbitMQ\SlimMessageBus.Host.RabbitMQ.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.SystemTextJson\SlimMessageBus.Host.Serialization.SystemTextJson.csproj" />
<ProjectReference Include="..\..\SlimMessageBus\SlimMessageBus.csproj" />
<ProjectReference Include="..\..\Tools\SecretStore\SecretStore.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
</None>
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
},
"Console": {
"FormatterName": "simple",
"FormatterOptions": {
"SingleLine": true,
"TimestampFormat": "HH:mm:ss.fff "
}
}
},
"RabbitMQ": {
"ConnectionString": "{{rabbitmq_connectionstring}}"
}
}
Loading

0 comments on commit d1865a5

Please sign in to comment.