Skip to content

Commit

Permalink
Amazon.SQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 24, 2024
1 parent e925b85 commit a8b4b34
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

public static class SqsConsumerBuilderExtensions
{
public static ConsumerBuilder<T> Queue<T>(this ConsumerBuilder<T> consumerBuilder, string queue)
public static TConsumerBuilder Queue<TConsumerBuilder>(this TConsumerBuilder consumerBuilder, string queue)
where TConsumerBuilder : AbstractConsumerBuilder
{
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
if (queue is null) throw new ArgumentNullException(nameof(queue));

consumerBuilder.ConsumerSettings.PathKind = PathKind.Queue;
return consumerBuilder.Path(queue);
consumerBuilder.ConsumerSettings.Path = queue;
return consumerBuilder;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,4 @@ public static ProducerBuilder<T> Policy<T>(this ProducerBuilder<T> producerBuild

return producerBuilder;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace SlimMessageBus.Host.AmazonSQS;

public static class SqsRequestResponseBuilderExtensions
{
public static RequestResponseBuilder ReplyToQueue(this RequestResponseBuilder builder, string queue, Action<RequestResponseBuilder> builderConfig = null)
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (queue is null) throw new ArgumentNullException(nameof(queue));

builder.Settings.Path = queue;
builder.Settings.PathKind = PathKind.Queue;

builderConfig?.Invoke(builder);

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class DefaultSqsHeaderSerializer(bool detectStringType = true) : ISqsHead
public object Deserialize(string key, MessageAttributeValue value) => value?.DataType switch
{
"Number" when long.TryParse(value.StringValue, out var longValue) => longValue,
"String" when detectStringType && Guid.TryParse(value.StringValue, out var guid) => guid,
"String" when detectStringType && key != ReqRespMessageHeaders.RequestId && Guid.TryParse(value.StringValue, out var guid) => guid,
"String" when detectStringType && bool.TryParse(value.StringValue, out var b) => b,
"String" when detectStringType && DateTime.TryParse(value.StringValue, out var dt) => dt,
"String" => value.StringValue,
Expand Down
15 changes: 10 additions & 5 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,22 @@ public override async Task ProvisionTopology()

private async Task PopulatePathToUrlMappings()
{
foreach (var producer in Settings.Producers.Where(x => x.PathKind == PathKind.Queue))
var queuePaths = Settings.Producers.Where(x => x.PathKind == PathKind.Queue).Select(x => x.DefaultPath)
.Concat(Settings.Consumers.Where(x => x.PathKind == PathKind.Queue).Select(x => x.Path))
.Concat(Settings.RequestResponse?.PathKind == PathKind.Queue ? [Settings.RequestResponse.Path] : [])
.ToHashSet();

foreach (var queuePath in queuePaths)
{
try
{
_logger.LogDebug("Populating URL for queue {QueueName}", producer.DefaultPath);
var queueResponse = await _clientProvider.Client.GetQueueUrlAsync(producer.DefaultPath, CancellationToken);
_queueUrlByPath[producer.DefaultPath] = queueResponse.QueueUrl;
_logger.LogDebug("Populating URL for queue {QueueName}", queuePath);
var queueResponse = await _clientProvider.Client.GetQueueUrlAsync(queuePath, CancellationToken);
_queueUrlByPath[queuePath] = queueResponse.QueueUrl;
}
catch (QueueDoesNotExistException ex)
{
_logger.LogError(ex, "Queue {QueueName} does not exist, ensure that it either exists or topology provisioning is enabled", producer.DefaultPath);
_logger.LogError(ex, "Queue {QueueName} does not exist, ensure that it either exists or topology provisioning is enabled", queuePath);
}
}
}
Expand Down
186 changes: 113 additions & 73 deletions src/SlimMessageBus.Host.AmazonSQS/SqsTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
namespace SlimMessageBus.Host.AmazonSQS;

public class SqsTopologyService
{
private readonly ILogger _logger;
Expand All @@ -21,89 +20,130 @@ public SqsTopologyService(

public Task ProvisionTopology() => _providerSettings.TopologyProvisioning.OnProvisionTopology(_clientProvider.Client, DoProvisionTopology);

private async Task CreateQueue(string path,
bool fifo,
int? visibilityTimeout,
string policy,
Dictionary<string, string> attributes,
Dictionary<string, string> tags)
{
try
{
try
{
var queueUrl = await _clientProvider.Client.GetQueueUrlAsync(path);
if (queueUrl != null)
{
return;
}
}
catch (QueueDoesNotExistException)
{
// proceed to create the queue
}

var createQueueRequest = new CreateQueueRequest
{
QueueName = path,
Attributes = []
};

if (fifo)
{
createQueueRequest.Attributes.Add(QueueAttributeName.FifoQueue, "true");
}

if (visibilityTimeout != null)
{
createQueueRequest.Attributes.Add(QueueAttributeName.VisibilityTimeout, visibilityTimeout.ToString());
}

if (policy != null)
{
createQueueRequest.Attributes.Add(QueueAttributeName.Policy, policy);
}

if (attributes.Count > 0)
{
createQueueRequest.Attributes = attributes;
}

if (tags.Count > 0)
{
createQueueRequest.Tags = tags;
}

_providerSettings.TopologyProvisioning.CreateQueueOptions?.Invoke(createQueueRequest);

try
{
var createQueueResponse = await _clientProvider.Client.CreateQueueAsync(createQueueRequest);
_logger.LogInformation("Created queue {QueueName} with URL {QueueUrl}", path, createQueueResponse.QueueUrl);
}
catch (QueueNameExistsException)
{
_logger.LogInformation("Queue {QueueName} already exists", path);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating queue {QueueName}", path);
}
}

private async Task DoProvisionTopology()
{
try
{
_logger.LogInformation("Topology provisioning started...");

if (_providerSettings.TopologyProvisioning.CanProducerCreateQueue)
if (_providerSettings.TopologyProvisioning.CanConsumerCreateQueue)
{
foreach (var producer in _settings.Producers)
var consumersSettingsByPath = _settings.Consumers
.OfType<AbstractConsumerSettings>()
.Concat([_settings.RequestResponse])
.Where(x => x != null)
.GroupBy(x => (x.Path, x.PathKind))
.ToDictionary(x => x.Key, x => x.ToList());

foreach (var ((path, pathKind), consumerSettingsList) in consumersSettingsByPath)
{
try
if (pathKind == PathKind.Queue)
{
try
{
var queueUrl = await _clientProvider.Client.GetQueueUrlAsync(producer.DefaultPath);
if (queueUrl != null)
{
continue;
}
}
catch (QueueDoesNotExistException)
{
// proceed to create the queue
}

var createQueueRequest = new CreateQueueRequest
{
QueueName = producer.DefaultPath,
Attributes = []
};

if (producer.GetOrDefault(SqsProperties.EnableFifo, _settings, false))
{
createQueueRequest.Attributes.Add(QueueAttributeName.FifoQueue, "true");
}

var visibilityTimeout = producer.GetOrDefault(SqsProperties.VisibilityTimeout, _settings, null);
if (visibilityTimeout != null)
{
createQueueRequest.Attributes.Add(QueueAttributeName.VisibilityTimeout, visibilityTimeout.ToString());
}

var policy = producer.GetOrDefault(SqsProperties.Policy, _settings, null);
if (policy != null)
{
createQueueRequest.Attributes.Add(QueueAttributeName.Policy, policy);
}

var attributes = producer.GetOrDefault(SqsProperties.Attributes, [])
.Concat(_settings.GetOrDefault(SqsProperties.Attributes, []))
.GroupBy(x => x.Key, x => x.Value)
.ToDictionary(x => x.Key, x => x.First());

if (attributes.Count > 0)
{
createQueueRequest.Attributes = attributes;
}

var tags = producer.GetOrDefault(SqsProperties.Tags, [])
.Concat(_settings.GetOrDefault(SqsProperties.Tags, []))
.GroupBy(x => x.Key, x => x.Value)
.ToDictionary(x => x.Key, x => x.First());

if (tags.Count > 0)
{
createQueueRequest.Tags = tags;
}

_providerSettings.TopologyProvisioning.CreateQueueOptions?.Invoke(createQueueRequest);

try
{
var createQueueResponse = await _clientProvider.Client.CreateQueueAsync(createQueueRequest);
_logger.LogInformation("Created queue {QueueName} with URL {QueueUrl}", producer.DefaultPath, createQueueResponse.QueueUrl);
}
catch (QueueNameExistsException)
{
_logger.LogInformation("Queue {QueueName} already exists", producer.DefaultPath);
}
await CreateQueue(
path: path,
fifo: consumerSettingsList.Any(cs => cs.GetOrDefault(SqsProperties.EnableFifo, _settings, false)),
visibilityTimeout: consumerSettingsList.Select(cs => cs.GetOrDefault(SqsProperties.VisibilityTimeout, _settings, null)).Where(x => x != null).FirstOrDefault(),
policy: consumerSettingsList.Select(cs => cs.GetOrDefault(SqsProperties.Policy, _settings, null)).Where(x => x != null).FirstOrDefault(),
attributes: [],
tags: []);
}
catch (Exception ex)
}
}

if (_providerSettings.TopologyProvisioning.CanProducerCreateQueue)
{
foreach (var producer in _settings.Producers)
{
var attributes = producer.GetOrDefault(SqsProperties.Attributes, [])
.Concat(_settings.GetOrDefault(SqsProperties.Attributes, []))
.GroupBy(x => x.Key, x => x.Value)
.ToDictionary(x => x.Key, x => x.First());

var tags = producer.GetOrDefault(SqsProperties.Tags, [])
.Concat(_settings.GetOrDefault(SqsProperties.Tags, []))
.GroupBy(x => x.Key, x => x.Value)
.ToDictionary(x => x.Key, x => x.First());

if (producer.PathKind == PathKind.Queue)
{
_logger.LogError(ex, "Error creating queue {QueueName}", producer.DefaultPath);
await CreateQueue(
path: producer.DefaultPath,
fifo: producer.GetOrDefault(SqsProperties.EnableFifo, _settings, false),
visibilityTimeout: producer.GetOrDefault(SqsProperties.VisibilityTimeout, _settings, null),
policy: producer.GetOrDefault(SqsProperties.Policy, _settings, null),
attributes: attributes,
tags: tags);
}
}
}
Expand Down
79 changes: 79 additions & 0 deletions src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
namespace SlimMessageBus.Host.AmazonSQS.Test;

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;

using Amazon.SQS.Model;

using Microsoft.Extensions.Logging;

using SlimMessageBus.Host.Serialization.SystemTextJson;

/// <summary>
Expand Down Expand Up @@ -183,6 +186,63 @@ private async Task BasicProducerConsumer(int expectedMessageCopies, Action<TestD
additionalAssertion?.Invoke(new TestData { ProducedMessages = producedMessages, ConsumedMessages = consumedMessages.Snapshot() });
}

[Fact]
public async Task BasicReqRespOnQueue()
{
var queue = QueueName();
var responseQueue = $"{queue}-resp";

AddBusConfiguration(mbb =>
{
mbb.Produce<EchoRequest>(x =>
{
x.DefaultQueue(queue);
})
.Handle<EchoRequest, EchoResponse>(x => x.Queue(queue)
.WithHandler<EchoRequestHandler>()
.Instances(20))
.ExpectRequestResponses(x =>
{
x.ReplyToQueue(responseQueue);
x.DefaultTimeout(TimeSpan.FromSeconds(60));
});
});
await BasicReqResp();
}

private async Task BasicReqResp()
{
// arrange
var messageBus = MessageBus;

// act

// publish
var stopwatch = Stopwatch.StartNew();

var requests = Enumerable
.Range(0, NumberOfMessages)
.Select(i => new EchoRequest { Index = i, Message = $"Echo {i}" })
.ToList();

var responses = new ConcurrentBag<Tuple<EchoRequest, EchoResponse>>();
var responseTasks = requests.Select(async req =>
{
var resp = await messageBus.Send(req).ConfigureAwait(false);
responses.Add(Tuple.Create(req, resp));
});
await Task.WhenAll(responseTasks).ConfigureAwait(false);

stopwatch.Stop();
Logger.LogInformation("Published and received {Count} messages in {Elapsed}", responses.Count, stopwatch.Elapsed);

// assert

// all messages got back
responses.Count.Should().Be(NumberOfMessages);
responses.All(x => x.Item1.Message == x.Item2.Message).Should().BeTrue();
}

private static string QueueName([CallerMemberName] string testName = null)
=> $"{QueueNamePrefix}_{DateTimeOffset.UtcNow.Ticks}_{testName}";
}
Expand Down Expand Up @@ -245,4 +305,23 @@ public Task OnHandle(PingDerivedMessage message, CancellationToken cancellationT
_logger.LogInformation("Got message {Counter:000} on path {Path} message id {MessageId}.", message.Counter, Context.Path, transportMessage.MessageId);
return Task.CompletedTask;
}
}

public record EchoRequest : IRequest<EchoResponse>
{
public int Index { get; set; }
public string Message { get; set; }
}

public record EchoResponse(string Message);

public class EchoRequestHandler : IRequestHandler<EchoRequest, EchoResponse>
{
public EchoRequestHandler(TestMetric testMetric)
{
testMetric.OnCreatedConsumer();
}

public Task<EchoResponse> OnHandle(EchoRequest request, CancellationToken cancellationToken)
=> Task.FromResult(new EchoResponse(request.Message));
}
Loading

0 comments on commit a8b4b34

Please sign in to comment.