From aeb5c348740ae5772902de47ae0c2d2b166473c9 Mon Sep 17 00:00:00 2001 From: "Tristan (HouseCat) Hyams" Date: Mon, 29 Apr 2024 09:10:25 -0500 Subject: [PATCH] Code cleanup. --- .../Mappers/GeometryPointTypeHandler.cs | 2 +- src/HouseofCat.RabbitMQ/Consumer/Consumer.cs | 4 +-- .../Dataflows/ConsumerDataflow.cs | 28 +++++++++---------- .../Dataflows/ConsumerPipeline.cs | 16 ++--------- .../Extensions/MessageExtensions.cs | 2 +- src/HouseofCat.RabbitMQ/Messages/Metadata.cs | 12 ++++---- src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs | 4 +-- .../Publisher/Publisher.cs | 12 ++++---- .../Services/MaintenanceService.cs | 4 +-- .../Extensions/ObjectExtensions.cs | 2 +- .../Helpers/AppHelpers.cs | 2 +- .../Helpers/LogHelpers.cs | 2 +- .../Program.cs | 26 +++++++++-------- .../RabbitMQ.ConsumerDataflows.json | 6 ++-- 14 files changed, 56 insertions(+), 66 deletions(-) diff --git a/src/HouseofCat.Data/Database/Dapper/Mappers/GeometryPointTypeHandler.cs b/src/HouseofCat.Data/Database/Dapper/Mappers/GeometryPointTypeHandler.cs index 10b44374..c3cb919b 100644 --- a/src/HouseofCat.Data/Database/Dapper/Mappers/GeometryPointTypeHandler.cs +++ b/src/HouseofCat.Data/Database/Dapper/Mappers/GeometryPointTypeHandler.cs @@ -14,7 +14,7 @@ public partial class GeometryPointTypeHandler : SqlMapper.TypeHandler> ReadUntilEmptyAsync(Cancellatio await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false); while (_consumerChannel.Reader.TryRead(out var message)) { - if (message == null) { break; } + if (message is null) { break; } list.Add(message); } diff --git a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs index a581fa4f..6bca9485 100644 --- a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs +++ b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs @@ -177,7 +177,7 @@ public ConsumerDataflow WithErrorHandling( TaskScheduler taskScheduler = null) { Guard.AgainstNull(action, nameof(action)); - if (_errorBuffer == null) + if (_errorBuffer is null) { _errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler); var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -194,7 +194,7 @@ public ConsumerDataflow WithErrorHandling( TaskScheduler taskScheduler = null) { Guard.AgainstNull(action, nameof(action)); - if (_errorBuffer == null) + if (_errorBuffer is null) { _errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler); var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -254,7 +254,7 @@ public ConsumerDataflow WithFinalization( TaskScheduler taskScheduler = null) { Guard.AgainstNull(action, nameof(action)); - if (_finalization == null) + if (_finalization is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); _finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization")); @@ -270,7 +270,7 @@ public ConsumerDataflow WithFinalization( TaskScheduler taskScheduler = null) { Guard.AgainstNull(action, nameof(action)); - if (_finalization == null) + if (_finalization is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); _finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization")); @@ -284,7 +284,7 @@ public ConsumerDataflow WithBuildState( int? boundedCapacity = null, TaskScheduler taskScheduler = null) { - if (_buildStateBlock == null) + if (_buildStateBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); _buildStateBlock = GetBuildStateBlock(executionOptions); @@ -299,7 +299,7 @@ public ConsumerDataflow WithDecryptionStep( TaskScheduler taskScheduler = null) { Guard.AgainstNull(_encryptionProvider, nameof(_encryptionProvider)); - if (_decryptBlock == null) + if (_decryptBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -320,7 +320,7 @@ public ConsumerDataflow WithDecompressionStep( TaskScheduler taskScheduler = null) { Guard.AgainstNull(_compressionProvider, nameof(_compressionProvider)); - if (_decompressBlock == null) + if (_decompressBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -342,7 +342,7 @@ public ConsumerDataflow WithCreateSendMessage( int? boundedCapacity = null, TaskScheduler taskScheduler = null) { - if (_createSendMessage == null) + if (_createSendMessage is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); _createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create_send_message")); @@ -357,7 +357,7 @@ public ConsumerDataflow WithSendCompressedStep( TaskScheduler taskScheduler = null) { Guard.AgainstNull(_compressionProvider, nameof(_compressionProvider)); - if (_compressBlock == null) + if (_compressBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -378,7 +378,7 @@ public ConsumerDataflow WithSendEncryptedStep( TaskScheduler taskScheduler = null) { Guard.AgainstNull(_encryptionProvider, nameof(_encryptionProvider)); - if (_encryptBlock == null) + if (_encryptBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); @@ -398,7 +398,7 @@ public ConsumerDataflow WithSendStep( int? boundedCapacity = null, TaskScheduler taskScheduler = null) { - if (_sendMessageBlock == null) + if (_sendMessageBlock is null) { var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); _sendMessageBlock = GetWrappedPublishTransformBlock(_rabbitService, executionOptions); @@ -432,7 +432,7 @@ protected virtual void BuildLinkages(DataflowLinkOptions overrid } ((ISourceBlock)_inputBuffer).LinkTo(_buildStateBlock, overrideOptions ?? _linkStepOptions); - _buildStateBlock.LinkTo(_errorBuffer, overrideOptions ?? _linkStepOptions, x => x == null); + _buildStateBlock.LinkTo(_errorBuffer, overrideOptions ?? _linkStepOptions, x => x is null); SetCurrentSourceBlock(_buildStateBlock); LinkPreProcessing(overrideOptions); @@ -592,7 +592,7 @@ TState WrapAction(TState state) { if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType) { - if (state.ReceivedMessage.Message == null) + if (state.ReceivedMessage.Message is null) { state.ReceivedMessage.Message = _serializationProvider.Deserialize(state.ReceivedMessage.Body); } state.ReceivedMessage.Message.Body = action(state.ReceivedMessage.Message.Body); @@ -640,7 +640,7 @@ async Task WrapActionAsync(TState state) { if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType) { - if (state.ReceivedMessage.Message == null) + if (state.ReceivedMessage.Message is null) { state.ReceivedMessage.Message = _serializationProvider.Deserialize(state.ReceivedMessage.Body); } state.ReceivedMessage.Message.Body = await action(state.ReceivedMessage.Message.Body).ConfigureAwait(false); diff --git a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerPipeline.cs b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerPipeline.cs index 5354b930..3d0e9f54 100644 --- a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerPipeline.cs +++ b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerPipeline.cs @@ -32,7 +32,6 @@ public class ConsumerPipeline : IConsumerPipeline, IDisposable where TOut private CancellationTokenSource _cancellationTokenSource; private bool _disposedValue; private readonly SemaphoreSlim _cpLock = new SemaphoreSlim(1, 1); - private readonly SemaphoreSlim _pipeExecLock = new SemaphoreSlim(1, 1); public ConsumerPipeline( IConsumer consumer, @@ -135,15 +134,11 @@ public async Task PipelineStreamEngineAsync( bool waitForCompletion, CancellationToken token = default) { - await _pipeExecLock - .WaitAsync(2000, token) - .ConfigureAwait(false); - try { await foreach (var receivedMessage in Consumer.GetConsumerBuffer().ReadAllAsync(token)) { - if (receivedMessage == null) { continue; } + if (receivedMessage is null) { continue; } _logger.LogDebug( _consumerPipelineQueueing, @@ -188,7 +183,6 @@ await receivedMessage ConsumerOptions.ConsumerName, ex.Message); } - finally { _pipeExecLock.Release(); } } public async Task PipelineExecutionEngineAsync( @@ -196,17 +190,13 @@ public async Task PipelineExecutionEngineAsync( bool waitForCompletion, CancellationToken token = default) { - await _pipeExecLock - .WaitAsync(2000, token) - .ConfigureAwait(false); - try { while (await Consumer.GetConsumerBuffer().WaitToReadAsync(token).ConfigureAwait(false)) { while (Consumer.GetConsumerBuffer().TryRead(out var receivedMessage)) { - if (receivedMessage == null) { continue; } + if (receivedMessage is null) { continue; } _logger.LogDebug( _consumerPipelineQueueing, @@ -252,7 +242,6 @@ await receivedMessage ConsumerOptions.ConsumerName, ex.Message); } - finally { _pipeExecLock.Release(); } } public async Task AwaitCompletionAsync() @@ -267,7 +256,6 @@ protected virtual void Dispose(bool disposing) if (disposing) { _cpLock.Dispose(); - _pipeExecLock.Dispose(); _cancellationTokenSource?.Dispose(); } diff --git a/src/HouseofCat.RabbitMQ/Extensions/MessageExtensions.cs b/src/HouseofCat.RabbitMQ/Extensions/MessageExtensions.cs index 2b754011..6b836c1a 100644 --- a/src/HouseofCat.RabbitMQ/Extensions/MessageExtensions.cs +++ b/src/HouseofCat.RabbitMQ/Extensions/MessageExtensions.cs @@ -140,7 +140,7 @@ public static IList CreateManySimpleRandomMessages(string queueName, i public static void EnrichSpanWithTags(this IMessage message, TelemetrySpan span) { - if (message == null || span == null || !span.IsRecording) return; + if (message is null || span is null || !span.IsRecording) return; span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue); diff --git a/src/HouseofCat.RabbitMQ/Messages/Metadata.cs b/src/HouseofCat.RabbitMQ/Messages/Metadata.cs index 0b8583a6..bb969ff2 100644 --- a/src/HouseofCat.RabbitMQ/Messages/Metadata.cs +++ b/src/HouseofCat.RabbitMQ/Messages/Metadata.cs @@ -21,7 +21,7 @@ public sealed class Metadata : IMetadata public bool Encrypted() { - if (Fields == null) return false; + if (Fields is null) return false; if (Fields.TryGetValue(Constants.HeaderForEncrypted, out var value)) { @@ -33,7 +33,7 @@ public bool Encrypted() public string EncryptionType() { - if (Fields == null) return null; + if (Fields is null) return null; if (Fields.TryGetValue(Constants.HeaderForEncryption, out var value)) { @@ -45,7 +45,7 @@ public string EncryptionType() public string EncryptedDate() { - if (Fields == null) return null; + if (Fields is null) return null; if (Fields.TryGetValue(Constants.HeaderForEncryptDate, out var value)) { @@ -57,7 +57,7 @@ public string EncryptedDate() public DateTime? EncryptedDateTime() { - if (Fields == null) return null; + if (Fields is null) return null; if (Fields.TryGetValue(Constants.HeaderForEncryptDate, out var value) && DateTime.TryParse((string)value, out var dateTime)) @@ -70,7 +70,7 @@ public string EncryptedDate() public bool Compressed() { - if (Fields == null) return false; + if (Fields is null) return false; if (Fields.TryGetValue(Constants.HeaderForCompressed, out var value)) { @@ -82,7 +82,7 @@ public bool Compressed() public string CompressionType() { - if (Fields == null) return null; + if (Fields is null) return null; if (Fields.TryGetValue(Constants.HeaderForCompression, out var value)) { diff --git a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs index f4f52ccd..b0c22e27 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs @@ -144,7 +144,7 @@ public async Task GetChannelAsync() .ConfigureAwait(false); } - if (_channels == null) + if (_channels is null) { throw new InvalidOperationException(_channelPoolBadOptionChannelError); } @@ -211,7 +211,7 @@ public async Task GetAckChannelAsync() .ConfigureAwait(false); } - if (_ackChannels == null) + if (_ackChannels is null) { throw new InvalidOperationException(_channelPoolBadOptionAckChannelError); } diff --git a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs index 881e1993..13b8bdb2 100644 --- a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs +++ b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs @@ -125,7 +125,7 @@ public Publisher( _logger = LogHelpers.GetLogger(); _serializationProvider = serializationProvider; - if (Options.PublisherOptions.Encrypt && encryptionProvider == null) + if (Options.PublisherOptions.Encrypt && encryptionProvider is null) { Options.PublisherOptions.Encrypt = false; _logger.LogWarning("Encryption disabled, encryptionProvider provided was null."); @@ -135,7 +135,7 @@ public Publisher( _encryptionProvider = encryptionProvider; } - if (Options.PublisherOptions.Compress && compressionProvider == null) + if (Options.PublisherOptions.Compress && compressionProvider is null) { Options.PublisherOptions.Compress = false; _logger.LogWarning("Compression disabled, compressionProvider provided was null."); @@ -280,7 +280,7 @@ private async Task ProcessMessagesAsync(ChannelReader channelReader) { while (channelReader.TryRead(out var message)) { - if (message == null) + if (message is null) { continue; } using var span = OpenTelemetryHelpers.StartActiveSpan( @@ -373,7 +373,7 @@ public async Task PublishAsync( var error = false; var channelHost = await _channelPool.GetChannelAsync().ConfigureAwait(false); - if (basicProperties == null) + if (basicProperties is null) { basicProperties = channelHost.Channel.CreateBasicProperties(); basicProperties.DeliveryMode = 2; @@ -481,7 +481,7 @@ public async Task PublishBatchAsync( var error = false; var channelHost = await _channelPool.GetChannelAsync().ConfigureAwait(false); - if (basicProperties == null) + if (basicProperties is null) { basicProperties = channelHost.Channel.CreateBasicProperties(); basicProperties.DeliveryMode = 2; @@ -917,7 +917,7 @@ public static void EnrichSpanWithTags( string routingKey, string messageId = null) { - if (span == null || !span.IsRecording) return; + if (span is null || !span.IsRecording) return; span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue); diff --git a/src/HouseofCat.RabbitMQ/Services/MaintenanceService.cs b/src/HouseofCat.RabbitMQ/Services/MaintenanceService.cs index 8034bf57..a8cee17a 100644 --- a/src/HouseofCat.RabbitMQ/Services/MaintenanceService.cs +++ b/src/HouseofCat.RabbitMQ/Services/MaintenanceService.cs @@ -146,7 +146,7 @@ public async Task TransferAllMessagesAsync( while (true) { result = channelHost.Channel.BasicGet(originQueueName, true); - if (result == null) { break; } + if (result is null) { break; } if (result?.Body is not null) { @@ -187,7 +187,7 @@ public async Task TransferAllMessagesAsync( try { result = channelHost.Channel.BasicGet(originQueueName, true); - if (result == null) { break; } + if (result is null) { break; } } catch { error = true; } finally diff --git a/src/HouseofCat.Utilities/Extensions/ObjectExtensions.cs b/src/HouseofCat.Utilities/Extensions/ObjectExtensions.cs index 940f2cbb..2a290dbe 100644 --- a/src/HouseofCat.Utilities/Extensions/ObjectExtensions.cs +++ b/src/HouseofCat.Utilities/Extensions/ObjectExtensions.cs @@ -12,7 +12,7 @@ public static class ObjectExtensions public static long GetByteCount(this object input) { - if (input == null) return 0; + if (input is null) return 0; var type = input.GetType(); if (_primitiveTypeSizes.TryGetValue(type, out int sizeValue)) diff --git a/src/HouseofCat.Utilities/Helpers/AppHelpers.cs b/src/HouseofCat.Utilities/Helpers/AppHelpers.cs index 2dc789e2..ff675b91 100644 --- a/src/HouseofCat.Utilities/Helpers/AppHelpers.cs +++ b/src/HouseofCat.Utilities/Helpers/AppHelpers.cs @@ -22,7 +22,7 @@ public static bool IsDebug public static string GetFlexibleSemVersion(AssemblyName assemblyName) { - if (assemblyName == null) + if (assemblyName is null) { return null; } diff --git a/src/HouseofCat.Utilities/Helpers/LogHelpers.cs b/src/HouseofCat.Utilities/Helpers/LogHelpers.cs index d564174a..67fce033 100644 --- a/src/HouseofCat.Utilities/Helpers/LogHelpers.cs +++ b/src/HouseofCat.Utilities/Helpers/LogHelpers.cs @@ -12,7 +12,7 @@ public static ILoggerFactory LoggerFactory { get { - if (_factory == null) + if (_factory is null) { lock (_syncObj) { diff --git a/tests/RabbitMQ.ConsumerDataflowService/Program.cs b/tests/RabbitMQ.ConsumerDataflowService/Program.cs index bd784e0f..192fd8e3 100644 --- a/tests/RabbitMQ.ConsumerDataflowService/Program.cs +++ b/tests/RabbitMQ.ConsumerDataflowService/Program.cs @@ -11,6 +11,7 @@ var loggerFactory = LogHelpers.CreateConsoleLoggerFactory(LogLevel.Information); LogHelpers.LoggerFactory = loggerFactory; var logger = loggerFactory.CreateLogger(); +var logMessage = false; var builder = WebApplication.CreateBuilder(args); var configuration = new ConfigurationBuilder() @@ -34,7 +35,10 @@ { throw new Exception("Throwing an exception!"); } - logger.LogInformation(message); + + if (logMessage) + { logger.LogInformation(message); } + return state; }); @@ -45,7 +49,7 @@ var message = new Message { Exchange = "", - RoutingKey = "TestTargetQueue", + RoutingKey = "TestQueue", Body = Encoding.UTF8.GetBytes("Secret Message"), Metadata = new Metadata { @@ -60,18 +64,12 @@ return state; }); -dataflowService.AddStep( - "queue_new_message", - async (state) => - { - await rabbitService.Publisher.QueueMessageAsync(state.SendMessage); - return state; - }); - dataflowService.AddFinalization( (state) => { - logger.LogInformation("Finalization Step!"); + if (logMessage) + { logger.LogInformation("Finalization Step!"); } + state.ReceivedMessage.AckMessage(); }); @@ -83,7 +81,11 @@ await dataflowService.StartAsync(); -logger.LogInformation("Listening for Messages! Press CTRL+C to initiate graceful shutdown and stop consumer..."); +app.Lifetime.ApplicationStarted.Register( + () => + { + logger.LogInformation("Listening for Messages! Press CTRL+C to initiate graceful shutdown and stop consumer..."); + }); app.Lifetime.ApplicationStopping.Register( async () => diff --git a/tests/RabbitMQ.ConsumerDataflowService/RabbitMQ.ConsumerDataflows.json b/tests/RabbitMQ.ConsumerDataflowService/RabbitMQ.ConsumerDataflows.json index 0308dcc1..a2ceab1b 100644 --- a/tests/RabbitMQ.ConsumerDataflowService/RabbitMQ.ConsumerDataflows.json +++ b/tests/RabbitMQ.ConsumerDataflowService/RabbitMQ.ConsumerDataflows.json @@ -28,7 +28,7 @@ "TestConsumer": { "Enabled": true, "ConsumerName": "TestConsumer", - "BatchSize": 5, + "BatchSize": 50, "BehaviorWhenFull": 0, "UseTransientChannels": true, "AutoAck": false, @@ -45,9 +45,9 @@ "BuildQueueExclusive": false, "BuildQueueAutoDelete": false, "WorkflowName": "test_workflow", - "WorkflowMaxDegreesOfParallelism": 1, + "WorkflowMaxDegreesOfParallelism": 14, "WorkflowConsumerCount": 1, - "WorkflowBatchSize": 5, + "WorkflowBatchSize": 50, "WorkflowEnsureOrdered": false, "WorkflowWaitForCompletion": false, "WorkflowSendCompressed": false,