From 806aba3838587750b4bec08a6d374f2fb29580af Mon Sep 17 00:00:00 2001 From: "Tristan (HouseCat) Hyams" Date: Wed, 24 Apr 2024 16:24:47 -0500 Subject: [PATCH] Update AutoPublisher document. --- README.md | 1 + guides/rabbitmq/AutoPublisher.md | 98 +++++++++++++++++-- .../Publisher/Publisher.cs | 26 +++-- 3 files changed, 101 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 8568e0ea..57dfaa75 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ working with and so will all your testing making it a win win. * Getting started with *RabbitMQ.Dataflows* [ChannelPool and BasicGet](https://github.com/houseofcat/RabbitMQ.Dataflows/blob/main/guides/rabbitmq/BasicGet.md). * Getting started with *RabbitMQ.Dataflows* [ChannelPool and BasicConsume](https://github.com/houseofcat/RabbitMQ.Dataflows/blob/main/guides/rabbitmq/BasicConsume.md). * Getting started with *RabbitMQ.Dataflows* [Serialization](https://github.com/houseofcat/RabbitMQ.Dataflows/blob/main/guides/rabbitmq/Serialization.md). + * Getting started with *RabbitMQ.Dataflows* [AutoPublisher](https://github.com/houseofcat/RabbitMQ.Dataflows/blob/main/guides/rabbitmq/AutoPublisher.md). More to come! diff --git a/guides/rabbitmq/AutoPublisher.md b/guides/rabbitmq/AutoPublisher.md index 8e24c2df..52a16e5d 100644 --- a/guides/rabbitmq/AutoPublisher.md +++ b/guides/rabbitmq/AutoPublisher.md @@ -5,8 +5,14 @@ I would recommend understanding the `Publisher` guide before moving on to this o Publisher can create it's own `IChannelPool` for simplification or if you want a separate one for Publishers and one for Consumers. This can be a great way of keeping throughput at it's -highest when inside the same process. Inside the `RabbitService` it contains an `AutoPublisher` -and all your pre-configured `Consumer`. +highest when inside the same process. Inside the `RabbitService` it contains the `IPublisher` +and all your pre-configured `Consumers`. RabbitService isn't a particularly fancy or necessary +service. It's just a way to keep all your RabbitMQ related objects in one place and allow you +to dependency inject them where needed or to use directly. + +The following is an example of building a `Publisher, IPublisher` directly for using both normal Publish +methods and queueing messages for auto-publishing. The AutoPublishing is only useful if you intend +to use the `IMessage` object otherwise you are better off managing how you publish your raw data. ```csharp using HouseofCat.Compression; @@ -40,7 +46,7 @@ var rabbitOptions = new RabbitOptions } }; -// Step 2: Instantiate the Publisher +// Step 2: Instantiate the Publisher (this example means it will have it's own internal ChannelPool) var publisher = new Publisher( rabbitOptions, new JsonProvider(), @@ -57,17 +63,13 @@ try }); // Step 4: Create IMessage - var data = Encoding.UTF8.GetBytes("Hello, RabbitMQ!"); - var message = new Message(Shared.ExchangeName, Shared.RoutingKey, data, Guid.NewGuid().ToString()) - { - // DeliveryId for tracking/routing through Publisher/Consumer. - MessageId = Guid.NewGuid().ToString(), - }; + var data = Encoding.UTF8.GetBytes("Hello World, from RabbitMQ!"); + var message = new Message(Shared.ExchangeName, Shared.RoutingKey, data, Guid.NewGuid().ToString()); // Step 5: Queue Message (async publish). await publisher.QueueMessageAsync(message); - // Step 6: Stop Auto Publishing + // Step 6: Stop Auto Publishing (this will wait for all messages to be published before stopping) await publisher.StopAutoPublishAsync(); } catch (Exception ex) @@ -76,3 +78,79 @@ catch (Exception ex) } ``` +This example uses the `IRabbitService` more traditionally. You could imagine that you have injected +`IRabbitService` to the constructor of your BusinsessLogicClass and you just need to drop a message +into a RabbitMQ queue. The added benefit of using AutoPublisher is to allow you to publish asynchronously +from your code and not slowing down the current flow. This is a great way to keep your code nimble and +responsive. + +```csharp +using HouseofCat.Compression; +using HouseofCat.Encryption; +using HouseofCat.RabbitMQ; +using HouseofCat.RabbitMQ.Pools; +using HouseofCat.Serialization; +using Microsoft.Extensions.Logging; +using System.Text; + +// Step 1: Configure RabbitOptions (or load from file or IConfiguration). +var rabbitOptions = await RabbitExtensions.GetRabbitOptionsFromJsonFileAsync(configFileNamePath); + +// Step 2: Setup your Providers (all but ISerializationProvider is optional) +var jsonProvider = new JsonProvider(); +var hashProvider = new ArgonHashingProvider(); +var aes256Key = hashProvider.GetHashKey(EncryptionPassword, EncryptionSalt, KeySize); +var aes256Provider = new AesGcmEncryptionProvider(aes256Key); +var gzipProvider = new RecyclableGzipProvider(); + +// Step 3: Using extension method to create a ready to use RabbitService (StartAsync is called already). +var rabbitService = await rabbitOptions.BuildRabbitServiceAsync( + jsonProvider, + aes256Provider, + gzipProvider, + loggerFactory); + +// Step 4: Create IMessage and Payload +var message = new Message( + exchange: Shared.ExchangeName, + routingKey: Shared.RoutingKey, + body: dataAsBytes, + payloadId: Guid.NewGuid().ToString()); + +// Step 5: Queue Message (async publish). +await rabbitService.Publisher.QueueMessageAsync(message); +``` + +The `Publisher` also has a function that listens for Receipts of an AutoPublish. You can override the default +behavior by providing your own function when calling `StartAutoPublish`. This version is used when `null` is +provided by the user but the `Options.PublisherOptions.CreatePublishReceipts` is `true`. It is looking for +failed receipts (failure to publish) and this means we can requeue them for an additional attempt. This helps +prevent the loss of messages if a background publish fails. + +```csharp +private async ValueTask ProcessReceiptAsync(IPublishReceipt receipt) +{ + if (AutoPublisherStarted + && receipt.IsError + && receipt.OriginalMessage != null) + { + _logger.LogWarning($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Retrying with AutoPublishing..."); + + try + { await QueueMessageAsync(receipt.OriginalMessage); } + catch (Exception ex) /* No-op */ + { _logger.LogDebug("Error ({0}) occurred on retry, most likely because retry during shutdown.", ex.Message); } + } + else if (receipt.IsError) + { + _logger.LogError($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Unable to retry as the original message was not received."); + } +} +``` + +The method signature for `StartAutoPublish` with the optional `processReceiptAsync` is as follows: + +```csharp +void StartAutoPublish(Func processReceiptAsync = null); +Task StartAutoPublishAsync(Func processReceiptAsync = null); +``` \ No newline at end of file diff --git a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs index a6dde693..78982361 100644 --- a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs +++ b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs @@ -332,24 +332,22 @@ private async Task ProcessReceiptsAsync(Func process } } - // Super simple version to bake in requeueing of all failed to publish messages. private async ValueTask ProcessReceiptAsync(IPublishReceipt receipt) { - if (receipt.IsError && receipt.OriginalMessage != null) + if (AutoPublisherStarted + && receipt.IsError + && receipt.OriginalMessage != null) { - if (AutoPublisherStarted) - { - _logger.LogWarning($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Retrying with AutoPublishing..."); + _logger.LogWarning($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Retrying with AutoPublishing..."); - try - { await QueueMessageAsync(receipt.OriginalMessage); } - catch (Exception ex) /* No-op */ - { _logger.LogDebug("Error ({0}) occurred on retry, most likely because retry during shutdown.", ex.Message); } - } - else - { - _logger.LogError($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Unable to retry as the original message was not received."); - } + try + { await QueueMessageAsync(receipt.OriginalMessage); } + catch (Exception ex) /* No-op */ + { _logger.LogDebug("Error ({0}) occurred on retry, most likely because retry during shutdown.", ex.Message); } + } + else if (receipt.IsError) + { + _logger.LogError($"Failed publish for message ({receipt.OriginalMessage.MessageId}). Unable to retry as the original message was not received."); } }