Skip to content

Commit

Permalink
Fix for drain when prefetch is enabled (#41920)
Browse files Browse the repository at this point in the history
* Fix for drain when prefetch is enabled

* Fix to prefetch drain

* Add comment

* Update sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Co-authored-by: Jesse Squire <[email protected]>

---------

Co-authored-by: Jesse Squire <[email protected]>
  • Loading branch information
JoshLove-msft and jsquire authored Feb 14, 2024
1 parent a1030cb commit 86db5a7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
16 changes: 12 additions & 4 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,20 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
messagesReceived as IReadOnlyCollection<AmqpMessage> ?? messagesReceived?.ToList() ?? s_emptyAmqpMessageList;

// If this is a session receiver and we didn't receive all requested messages, we need to drain the credits
// to ensure FIFO ordering within each session. We exclude session processor since those will always receive a single message
// at a time, and if there are no messages, the session will be closed. The session won't be closed in the case that
// MaxConcurrentCallsPerSession > 1, but in that case FIFO is not possible to guarantee.
if (_isSessionReceiver && !_isProcessor && messageList.Count < maxMessages)
// to ensure FIFO ordering within each session. We exclude session processors, since those will always receive a single message
// at a time. If there are no messages, the session will be closed unless the processor was configured to receive from specific sessions.
// The session won't be closed in the case that MaxConcurrentCallsPerSession > 1, but with concurrency, it is not possible to guarantee ordering.
if (_isSessionReceiver && (!_isProcessor || SessionId != null) && messageList.Count < maxMessages)
{
await link.DrainAsyc(cancellationToken).ConfigureAwait(false);

// These workarounds are necessary in order to resume prefetching after the link has been drained
// https://github.com/Azure/azure-amqp/issues/252#issuecomment-1942734342
if (_prefetchCount > 0)
{
link.Settings.TotalLinkCredit = 0;
link.SetTotalLinkCredit((uint)_prefetchCount, true, true);
}
}

List<ServiceBusReceivedMessage> receivedMessages = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,18 +2680,26 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
}

[Test]
public async Task SessionOrderingIsGuaranteedProcessor()
[TestCase(true, true)]
[TestCase(true, false)]
[TestCase(false, true)]
[TestCase(false, false)]
public async Task SessionOrderingIsGuaranteedProcessor(bool prefetch, bool useSpecificSession)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
long lastSequenceNumber = 0;
var options = new ServiceBusSessionProcessorOptions
{
MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1, PrefetchCount = prefetch ? 5 : 0
};
if (useSpecificSession)
{
options.SessionIds.Add("session");
}

await using var processor = client.CreateSessionProcessor(scope.QueueName,
new ServiceBusSessionProcessorOptions
{
MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1
});
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += SessionErrorHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,12 +1150,17 @@ public async Task CannotDeadLetterAfterLinkReconnect()
}

[Test]
public async Task SessionOrderingIsGuaranteed()
[TestCase(true)]
[TestCase(false)]
public async Task SessionOrderingIsGuaranteed(bool prefetch)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var receiver = await client.AcceptSessionAsync(scope.QueueName, "session");
var receiver = await client.AcceptSessionAsync(scope.QueueName, "session", new ServiceBusSessionReceiverOptions
{
PrefetchCount = prefetch ? 5 : 0
});
var sender = client.CreateSender(scope.QueueName);

CancellationTokenSource cts = new CancellationTokenSource();
Expand Down

0 comments on commit 86db5a7

Please sign in to comment.