Skip to content

Commit

Permalink
GH-2607: Add SMLC.enforceImmediateAckForManual option
Browse files Browse the repository at this point in the history
Fixes: #2607

There are use-cases when `ImmediateAcknowledgeAmqpException` can be thrown outside
the listener method, therefore there is no way to reach `Channel.basicAck()`.
For example, for `AbstractMessageListenerContainer.afterReceivePostProcessors`

* Make force ack for `ImmediateAcknowledgeAmqpException` even if `AcknowledgeMode.MANUAL`.
This is controlled with newly introduced `enforceImmediateAckForManual` flag on the `SimpleMessageListenerContainer`.
Such an option might be as tentative solution to not break behavior for existing applications using the current point release.
We may consider to make this unconditional in future versions
  • Loading branch information
artembilan committed Feb 2, 2024
1 parent 5c3e739 commit c8c06ea
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.springframework.amqp.rabbit.config;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.utils.JavaUtils;
Expand Down Expand Up @@ -59,6 +62,8 @@ public class SimpleRabbitListenerContainerFactory

private Boolean consumerBatchEnabled;

private Boolean enforceImmediateAckForManual;

/**
* @param batchSize the batch size.
* @since 2.2
Expand Down Expand Up @@ -153,6 +158,17 @@ public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
}
}

/**
* Set to {@code true} to enforce {@link Channel#basicAck(long, boolean)}
* for {@link org.springframework.amqp.core.AcknowledgeMode#MANUAL}
* when {@link ImmediateAcknowledgeAmqpException} is thrown.
* This might be a tentative solution to not break behavior for current minor version.
* @param enforceImmediateAckForManual the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException
* @since 3.1.2
*/
public void setEnforceImmediateAckForManual(Boolean enforceImmediateAckForManual) {
this.enforceImmediateAckForManual = enforceImmediateAckForManual;
}
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
Expand Down Expand Up @@ -180,7 +196,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout)
.acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout);
.acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout)
.acceptIfNotNull(this.enforceImmediateAckForManual, instance::setEnforceImmediateAckForManual);
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
instance.setConsumerBatchEnabled(true);
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,25 @@ public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {

/**
* Perform a commit or message acknowledgement, as appropriate.
* NOTE: This method was never been intended tobe public.
* @param localTx Whether the channel is locally transacted.
* @return true if at least one delivery tag exists.
* @deprecated in favor of {@link #commitIfNecessary(boolean, boolean)}
*/
@Deprecated(forRemoval = true, since = "3.1.2")
public boolean commitIfNecessary(boolean localTx) {
return commitIfNecessary(localTx, false);
}

/**
* Perform a commit or message acknowledgement, as appropriate.
* NOTE: This method was never been intended tobe public.
* @param localTx Whether the channel is locally transacted.
* @param forceAck perform {@link Channel#basicAck(long, boolean)} independently of {@link #acknowledgeMode}.
* @return true if at least one delivery tag exists.
* @since 3.1.2
*/
boolean commitIfNecessary(boolean localTx, boolean forceAck) {
if (this.deliveryTags.isEmpty()) {
return false;
}
Expand All @@ -890,11 +905,10 @@ public boolean commitIfNecessary(boolean localTx) {
|| (this.transactional
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
try {

boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual());

if (ackRequired && (!this.transactional || isLocallyTransacted)) {
long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
long deliveryTag = new ArrayList<>(this.deliveryTags).get(this.deliveryTags.size() - 1);
try {
this.channel.basicAck(deliveryTag, true);
notifyMessageAckListener(true, deliveryTag, null);
Expand All @@ -909,14 +923,12 @@ public boolean commitIfNecessary(boolean localTx) {
// For manual acks we still need to commit
RabbitUtils.commitIfNecessary(this.channel);
}

}
finally {
this.deliveryTags.clear();
}

return true;

}

/**
Expand All @@ -931,7 +943,7 @@ private void notifyMessageAckListener(boolean success, long deliveryTag, @Nullab
this.messageAckListener.onComplete(success, deliveryTag, cause);
}
catch (Exception e) {
logger.error("An exception occured in MessageAckListener.", e);
logger.error("An exception occurred in MessageAckListener.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
Expand Down Expand Up @@ -134,6 +135,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta

private long consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;

private boolean enforceImmediateAckForManual;

private volatile int concurrentConsumers = 1;

private volatile Integer maxConcurrentConsumers;
Expand Down Expand Up @@ -504,6 +507,18 @@ public void setConsumerStartTimeout(long consumerStartTimeout) {
this.consumerStartTimeout = consumerStartTimeout;
}

/**
* Set to {@code true} to enforce {@link Channel#basicAck(long, boolean)}
* for {@link org.springframework.amqp.core.AcknowledgeMode#MANUAL}
* when {@link ImmediateAcknowledgeAmqpException} is thrown.
* This might be a tentative solution to not break behavior for current minor version.
* @param enforceImmediateAckForManual the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException
* @since 3.1.2
*/
public void setEnforceImmediateAckForManual(boolean enforceImmediateAckForManual) {
this.enforceImmediateAckForManual = enforceImmediateAckForManual;
}

/**
* Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
* consumers.
Expand Down Expand Up @@ -1012,6 +1027,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep

List<Message> messages = null;
long deliveryTag = 0;
boolean immediateAck = false;
boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0;
long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0;
for (int i = 0; i < this.batchSize; i++) {
Expand Down Expand Up @@ -1050,9 +1066,9 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
if (messages == null) {
messages = new ArrayList<>(this.batchSize);
}
if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) {
final List<Message> messageList = messages;
getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment));
BatchingStrategy batchingStrategy = getBatchingStrategy();
if (isDeBatchingEnabled() && batchingStrategy.canDebatch(message.getMessageProperties())) {
batchingStrategy.deBatch(message, messages::add);
}
else {
messages.add(message);
Expand All @@ -1073,6 +1089,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
+ e.getMessage() + "': "
+ message.getMessageProperties().getDeliveryTag());
}
immediateAck = this.enforceImmediateAckForManual;
break;
}
catch (Exception ex) {
Expand All @@ -1081,6 +1098,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
immediateAck = this.enforceImmediateAckForManual;
break;
}
long tagToRollback = isAsyncReplies()
Expand Down Expand Up @@ -1117,14 +1135,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
}
}
if (messages != null) {
executeWithList(channel, messages, deliveryTag, consumer);
immediateAck = executeWithList(channel, messages, deliveryTag, consumer);
}

return consumer.commitIfNecessary(isChannelLocallyTransacted());
return consumer.commitIfNecessary(isChannelLocallyTransacted(), immediateAck);

}

private void executeWithList(Channel channel, List<Message> messages, long deliveryTag,
private boolean executeWithList(Channel channel, List<Message> messages, long deliveryTag,
BlockingQueueConsumer consumer) {

try {
Expand All @@ -1136,15 +1154,15 @@ private void executeWithList(Channel channel, List<Message> messages, long deliv
+ e.getMessage() + "' (last in batch): "
+ deliveryTag);
}
return;
return this.enforceImmediateAckForManual;
}
catch (Exception ex) {
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery (last in batch): "
+ deliveryTag);
}
return;
return this.enforceImmediateAckForManual;
}
if (getTransactionManager() != null) {
if (getTransactionAttribute().rollbackOn(ex)) {
Expand Down Expand Up @@ -1173,6 +1191,7 @@ private void executeWithList(Channel channel, List<Message> messages, long deliv
throw ex;
}
}
return false;
}

protected void handleStartupFailure(BackOffExecution backOffExecution) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,8 +28,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand All @@ -45,6 +48,7 @@
* @author Dave Syer
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.0
*
Expand All @@ -56,7 +60,7 @@ public class MessageListenerManualAckIntegrationTests {

public static final String TEST_QUEUE = "test.queue.MessageListenerManualAckIntegrationTests";

private static Log logger = LogFactory.getLog(MessageListenerManualAckIntegrationTests.class);
private static final Log logger = LogFactory.getLog(MessageListenerManualAckIntegrationTests.class);

private final Queue queue = new Queue(TEST_QUEUE);

Expand Down Expand Up @@ -121,6 +125,26 @@ public void testListenerWithManualAckTransactional() throws Exception {
assertThat(template.receiveAndConvert(queue.getName())).isNull();
}

@Test
public void immediateIsAckedForManual() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
container = createContainer(new ImmediateTestListener(latch));
container.setEnforceImmediateAckForManual(true);

template.convertAndSend(queue.getName(), "test data");

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

container.stop();

Channel channel = template.getConnectionFactory().createConnection().createChannel(false);

await().untilAsserted(() -> assertThat(channel.consumerCount(queue.getName())).isEqualTo(0));
assertThat(channel.messageCount(queue.getName())).isEqualTo(0);

channel.close();
}

private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(listener));
Expand Down Expand Up @@ -159,4 +183,23 @@ public void onMessage(Message message, Channel channel) throws Exception {
}
}

static class ImmediateTestListener implements MessageListener {

private final CountDownLatch latch;

ImmediateTestListener(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void onMessage(Message message) {
try {
throw new ImmediateAcknowledgeAmqpException("intentional");
}
finally {
this.latch.countDown();
}
}
}

}

0 comments on commit c8c06ea

Please sign in to comment.