diff --git a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs index 3ec8db7..88b5972 100644 --- a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs +++ b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs @@ -31,7 +31,7 @@ public interface IConsumer Task StopConsumerAsync(bool immediate = false); Task> ReadUntilEmptyAsync(CancellationToken token = default); - Task> ReadUntilStopAsync(CancellationToken token = default); + ValueTask> ReadUntilStopAsync(CancellationToken token = default); } public class Consumer : IConsumer, IDisposable @@ -489,7 +489,7 @@ public async Task> ReadUntilEmptyAsync(Cancellatio return list; } - public async Task> ReadUntilStopAsync(CancellationToken token = default) + public async ValueTask> ReadUntilStopAsync(CancellationToken token = default) { if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage); diff --git a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs index a8644ae..2704909 100644 --- a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs @@ -91,13 +91,13 @@ public sealed class PoolOptions /// Number of channels to keep in each of the channel pool. Used in round-robin to perform actions. /// Default value is 0. /// - public ushort Channels { get; set; } = 1; + public ushort Channels { get; set; } = 2; /// /// Number of ackable channels to keep in each of the channel pool. Used in round-robin to perform actions. /// Default value is 10. /// - public ushort AckableChannels { get; set; } = 1; + public ushort AckableChannels { get; set; } = 2; /// /// The time to sleep (in ms) when an error occurs on Channel or Connection creation. It's best not to be hyper aggressive with this value. diff --git a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs index 8deef77..6eb0b79 100644 --- a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs +++ b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs @@ -229,93 +229,86 @@ public IConsumer GetConsumer(string consumerName) return value; } - public async Task DecomcryptAsync(IMessage message) + public async Task ComcryptAsync(IMessage message) { - if (message is null) return; - - var decrypted = Decrypt(message); + if (message is null || message.Body.Length == 0) return; - if (decrypted) - { - await DecompressAsync(message).ConfigureAwait(false); - } + await CompressAsync(message).ConfigureAwait(false); + Encrypt(message); } - public async Task ComcryptAsync(IMessage message) + public async Task DecomcryptAsync(IMessage message) { - if (message is null) return; + if (message is null || message.Body.Length == 0) return; - await CompressAsync(message).ConfigureAwait(false); - - Encrypt(message); + Decrypt(message); + await DecompressAsync(message).ConfigureAwait(false); } public bool Encrypt(IMessage message) { - if (!message.Metadata.Encrypted()) + if (EncryptionProvider is null || message.Metadata.Encrypted()) { - message.Body = EncryptionProvider.Encrypt(message.Body); - message.Metadata.Fields[Constants.HeaderForEncrypted] = true; - message.Metadata.Fields[Constants.HeaderForEncryption] = EncryptionProvider.Type; - message.Metadata.Fields[Constants.HeaderForEncryptDate] = TimeHelpers.GetDateTimeNow(TimeFormat); - - return true; + return false; } - return false; + message.Body = EncryptionProvider.Encrypt(message.Body); + message.Metadata.Fields[Constants.HeaderForEncrypted] = true; + message.Metadata.Fields[Constants.HeaderForEncryption] = EncryptionProvider.Type; + message.Metadata.Fields[Constants.HeaderForEncryptDate] = TimeHelpers.GetDateTimeNow(TimeFormat); + return true; } public bool Decrypt(IMessage message) { - if (message.Metadata.Encrypted()) + if (EncryptionProvider is null || !message.Metadata.Encrypted()) { - message.Body = EncryptionProvider.Decrypt(message.Body); - message.Metadata.Fields[Constants.HeaderForEncrypted] = false; - - message.Metadata.Fields.Remove(Constants.HeaderForEncryption); - message.Metadata.Fields.Remove(Constants.HeaderForEncryptDate); - - return true; + return false; } - return false; + message.Body = EncryptionProvider.Decrypt(message.Body); + message.Metadata.Fields[Constants.HeaderForEncrypted] = false; + message.Metadata.Fields.Remove(Constants.HeaderForEncryption); + message.Metadata.Fields.Remove(Constants.HeaderForEncryptDate); + return true; } public async Task CompressAsync(IMessage message) { - if (message.Metadata.Encrypted()) - { return false; } // Don't compress after encryption. + if (CompressionProvider is null + || message.Metadata.Encrypted() + || message.Metadata.Compressed()) + { + return false; + } - if (!message.Metadata.Compressed()) + try { - message.Body = (await CompressionProvider.CompressAsync(message.Body).ConfigureAwait(false)).ToArray(); + message.Body = await CompressionProvider.CompressAsync(message.Body).ConfigureAwait(false); message.Metadata.Fields[Constants.HeaderForCompressed] = true; message.Metadata.Fields[Constants.HeaderForCompression] = CompressionProvider.Type; - return true; } - - return true; + catch { return false; } } public async Task DecompressAsync(IMessage message) { - if (message.Metadata.Encrypted()) - { return false; } // Don't decompress before decryption. - - if (message.Metadata.Compressed()) + if (CompressionProvider is null + || message.Metadata.Encrypted() + || !message.Metadata.Compressed()) { - try - { - message.Body = (await CompressionProvider.DecompressAsync(message.Body).ConfigureAwait(false)).ToArray(); - message.Metadata.Fields[Constants.HeaderForCompressed] = false; - - message.Metadata.Fields.Remove(Constants.HeaderForCompression); - } - catch { return false; } + return false; } - return true; + try + { + message.Body = await CompressionProvider.DecompressAsync(message.Body).ConfigureAwait(false); + message.Metadata.Fields[Constants.HeaderForCompressed] = false; + message.Metadata.Fields.Remove(Constants.HeaderForCompression); + return true; + } + catch { return false; } } public async Task> GetAsync(string queueName) diff --git a/tests/RabbitMQ.Console.Tests/RabbitMQ.RabbitServiceTests.json b/tests/RabbitMQ.Console.Tests/RabbitMQ.RabbitServiceTests.json index 285ec1b..a9a1fd5 100644 --- a/tests/RabbitMQ.Console.Tests/RabbitMQ.RabbitServiceTests.json +++ b/tests/RabbitMQ.Console.Tests/RabbitMQ.RabbitServiceTests.json @@ -28,7 +28,7 @@ "TestConsumer": { "Enabled": true, "ConsumerName": "TestConsumer", - "BatchSize": 5, + "BatchSize": 10, "BehaviorWhenFull": 0, "UseTransientChannels": true, "AutoAck": false, @@ -45,9 +45,9 @@ "BuildQueueExclusive": false, "BuildQueueAutoDelete": false, "WorkflowName": "TestConsumerWorkflow", - "WorkflowMaxDegreesOfParallelism": 1, + "WorkflowMaxDegreesOfParallelism": 8, "WorkflowConsumerCount": 1, - "WorkflowBatchSize": 5, + "WorkflowBatchSize": 10, "WorkflowEnsureOrdered": false, "WorkflowWaitForCompletion": false } diff --git a/tests/UnitTests/RabbitMQ/RabbitServiceTests.cs b/tests/UnitTests/RabbitMQ/RabbitServiceTests.cs new file mode 100644 index 0000000..db3a235 --- /dev/null +++ b/tests/UnitTests/RabbitMQ/RabbitServiceTests.cs @@ -0,0 +1,60 @@ +using HouseofCat.Compression; +using HouseofCat.Encryption; +using HouseofCat.Hashing; +using HouseofCat.RabbitMQ; +using HouseofCat.RabbitMQ.Services; +using HouseofCat.Serialization; +using System.Text; + +namespace RabbitMQ; + +public class RabbitServiceTests +{ + private readonly IRabbitService _rabbitService; + + public RabbitServiceTests() + { + var options = new RabbitOptions(); + var hashingProvider = new ArgonHashingProvider(); + + var hashKey = hashingProvider.GetHashKey("Sega", "Nintendo", 32); + + _rabbitService = new RabbitService( + options, + new JsonProvider(), + new AesGcmEncryptionProvider(hashKey), + new GzipProvider()); + } + + [Fact] + public async Task ComcryptTestAsync() + { + var message = new Message("", "TestQueue", Encoding.UTF8.GetBytes("Hello World")); + + await _rabbitService.ComcryptAsync(message); + + Assert.True(message.Metadata.Encrypted()); + Assert.True(message.Metadata.Compressed()); + } + + [Fact] + public async Task ComcryptDecomcryptTestAsync() + { + var messageAsString = "Hello World"; + var message = new Message("", "TestQueue", Encoding.UTF8.GetBytes(messageAsString)); + + await _rabbitService.ComcryptAsync(message); + + Assert.True(message.Metadata.Encrypted()); + Assert.True(message.Metadata.Compressed()); + + await _rabbitService.DecomcryptAsync(message); + + Assert.False(message.Metadata.Encrypted()); + Assert.False(message.Metadata.Compressed()); + + var bodyAsString = Encoding.UTF8.GetString(message.Body.Span); + + Assert.Equal(messageAsString, bodyAsString); + } +} diff --git a/tests/UnitTests/UnitTests.csproj b/tests/UnitTests/UnitTests.csproj index 0615349..0f78f0e 100644 --- a/tests/UnitTests/UnitTests.csproj +++ b/tests/UnitTests/UnitTests.csproj @@ -34,6 +34,7 @@ +