Skip to content

Commit

Permalink
Improve Consumer IAsyncEnumerable performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 23, 2024
1 parent faf683e commit d7c9790
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 13 deletions.
10 changes: 3 additions & 7 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IConsumer<TFromQueue>
Task StopConsumerAsync(bool immediate = false);

Task<IEnumerable<TFromQueue>> ReadUntilEmptyAsync(CancellationToken token = default);
IAsyncEnumerable<IReceivedMessage> ReadUntilStopAsync(CancellationToken token = default);
Task<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default);
}

public class Consumer : IConsumer<IReceivedMessage>, IDisposable
Expand Down Expand Up @@ -489,15 +489,11 @@ public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync(Cancellatio
return list;
}

public async IAsyncEnumerable<IReceivedMessage> ReadUntilStopAsync(
[EnumeratorCancellation] CancellationToken token = default)
public async Task<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default)
{
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

await foreach (var receivedMessage in _consumerChannel.Reader.ReadAllAsync(token))
{
yield return receivedMessage;
}
return _consumerChannel.Reader.ReadAllAsync(token);
}

protected virtual void Dispose(bool disposing)
Expand Down
2 changes: 1 addition & 1 deletion tests/RabbitMQ.Console.Tests/Tests/ConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static async Task RunConsumerTestAsync(ILogger logger, string configFileN
{
await consumer.StartConsumerAsync();

await foreach (var receivedMessage in consumer.ReadUntilStopAsync())
await foreach (var receivedMessage in await consumer.ReadUntilStopAsync())
{
logger.LogInformation("Received message: [{message}]", Encoding.UTF8.GetString(receivedMessage.Body.Span));
receivedMessage.AckMessage();
Expand Down
4 changes: 2 additions & 2 deletions tests/RabbitMQ.Console.Tests/Tests/PubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static async Task StartConsumerAsync(ILogger logger, IChannelPool channe
{
await consumer.StartConsumerAsync();

await foreach (var receivedMessage in consumer.ReadUntilStopAsync())
await foreach (var receivedMessage in await consumer.ReadUntilStopAsync())
{
try
{
Expand Down Expand Up @@ -154,7 +154,7 @@ private static async Task VerifyNoDuplicatesInQueueAsync(ILogger logger, IChanne
{
await consumer.StartConsumerAsync();

await foreach (var receivedMessage in consumer.ReadUntilStopAsync())
await foreach (var receivedMessage in await consumer.ReadUntilStopAsync())
{
var message = JsonSerializer.Deserialize<Message>(receivedMessage.Body.Span);
var number = Encoding.UTF8.GetString(message.Body.Span);
Expand Down
4 changes: 2 additions & 2 deletions tests/RabbitMQ.Console.Tests/Tests/RabbitServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static async Task RunRabbitServicePingPongTestAsync(ILoggerFactory logger
preProcessSpan.End();

// Ping pong the same message.
await foreach (var receivedMessage in consumer.ReadUntilStopAsync())
await foreach (var receivedMessage in await consumer.ReadUntilStopAsync())
{
using var consumerSpan = OpenTelemetryHelpers.StartActiveSpan(
"messaging.rabbitmq.consumer process",
Expand Down Expand Up @@ -86,7 +86,7 @@ public static async Task RunRabbitServiceAltPingPongTestAsync(ILoggerFactory log
rabbitService.Publisher.QueueMessage(message);

// Ping pong the same message.
await foreach (var receivedMessage in consumer.ReadUntilStopAsync())
await foreach (var receivedMessage in await consumer.ReadUntilStopAsync())
{
if (receivedMessage?.Message is null)
{
Expand Down
2 changes: 1 addition & 1 deletion tests/UnitTests/UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="ReportGenerator" Version="5.2.4" />
<PackageReference Include="ReportGenerator" Version="5.2.5" />
<PackageReference Include="xunit" Version="2.7.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.8">
<PrivateAssets>all</PrivateAssets>
Expand Down

0 comments on commit d7c9790

Please sign in to comment.