From c8c06eacc7b15dfcfc1ead5d175daa97cdc604f8 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 2 Feb 2024 13:50:59 -0500 Subject: [PATCH] GH-2607: Add `SMLC.enforceImmediateAckForManual` option Fixes: #2607 There are use-cases when `ImmediateAcknowledgeAmqpException` can be thrown outside the listener method, therefore there is no way to reach `Channel.basicAck()`. For example, for `AbstractMessageListenerContainer.afterReceivePostProcessors` * Make force ack for `ImmediateAcknowledgeAmqpException` even if `AcknowledgeMode.MANUAL`. This is controlled with newly introduced `enforceImmediateAckForManual` flag on the `SimpleMessageListenerContainer`. Such an option might be as tentative solution to not break behavior for existing applications using the current point release. We may consider to make this unconditional in future versions --- .../SimpleRabbitListenerContainerFactory.java | 19 +++++++- .../listener/BlockingQueueConsumer.java | 24 +++++++--- .../SimpleMessageListenerContainer.java | 35 ++++++++++---- ...sageListenerManualAckIntegrationTests.java | 47 ++++++++++++++++++- 4 files changed, 108 insertions(+), 17 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index baa4c975df..e0aebfffcc 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -16,6 +16,9 @@ package org.springframework.amqp.rabbit.config; +import com.rabbitmq.client.Channel; + +import org.springframework.amqp.ImmediateAcknowledgeAmqpException; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.utils.JavaUtils; @@ -59,6 +62,8 @@ public class SimpleRabbitListenerContainerFactory private Boolean consumerBatchEnabled; + private Boolean enforceImmediateAckForManual; + /** * @param batchSize the batch size. * @since 2.2 @@ -153,6 +158,17 @@ public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { } } + /** + * Set to {@code true} to enforce {@link Channel#basicAck(long, boolean)} + * for {@link org.springframework.amqp.core.AcknowledgeMode#MANUAL} + * when {@link ImmediateAcknowledgeAmqpException} is thrown. + * This might be a tentative solution to not break behavior for current minor version. + * @param enforceImmediateAckForManual the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException + * @since 3.1.2 + */ + public void setEnforceImmediateAckForManual(Boolean enforceImmediateAckForManual) { + this.enforceImmediateAckForManual = enforceImmediateAckForManual; + } @Override protected SimpleMessageListenerContainer createContainerInstance() { return new SimpleMessageListenerContainer(); @@ -180,7 +196,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb .acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger) .acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout) - .acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout); + .acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout) + .acceptIfNotNull(this.enforceImmediateAckForManual, instance::setEnforceImmediateAckForManual); if (Boolean.TRUE.equals(this.consumerBatchEnabled)) { instance.setConsumerBatchEnabled(true); /* diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java index de2d5da756..f78a3fe0e0 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java @@ -875,10 +875,25 @@ public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) { /** * Perform a commit or message acknowledgement, as appropriate. + * NOTE: This method was never been intended tobe public. * @param localTx Whether the channel is locally transacted. * @return true if at least one delivery tag exists. + * @deprecated in favor of {@link #commitIfNecessary(boolean, boolean)} */ + @Deprecated(forRemoval = true, since = "3.1.2") public boolean commitIfNecessary(boolean localTx) { + return commitIfNecessary(localTx, false); + } + + /** + * Perform a commit or message acknowledgement, as appropriate. + * NOTE: This method was never been intended tobe public. + * @param localTx Whether the channel is locally transacted. + * @param forceAck perform {@link Channel#basicAck(long, boolean)} independently of {@link #acknowledgeMode}. + * @return true if at least one delivery tag exists. + * @since 3.1.2 + */ + boolean commitIfNecessary(boolean localTx, boolean forceAck) { if (this.deliveryTags.isEmpty()) { return false; } @@ -890,11 +905,10 @@ public boolean commitIfNecessary(boolean localTx) { || (this.transactional && TransactionSynchronizationManager.getResource(this.connectionFactory) == null); try { - - boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual(); + boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual()); if (ackRequired && (!this.transactional || isLocallyTransacted)) { - long deliveryTag = new ArrayList(this.deliveryTags).get(this.deliveryTags.size() - 1); + long deliveryTag = new ArrayList<>(this.deliveryTags).get(this.deliveryTags.size() - 1); try { this.channel.basicAck(deliveryTag, true); notifyMessageAckListener(true, deliveryTag, null); @@ -909,14 +923,12 @@ public boolean commitIfNecessary(boolean localTx) { // For manual acks we still need to commit RabbitUtils.commitIfNecessary(this.channel); } - } finally { this.deliveryTags.clear(); } return true; - } /** @@ -931,7 +943,7 @@ private void notifyMessageAckListener(boolean success, long deliveryTag, @Nullab this.messageAckListener.onComplete(success, deliveryTag, cause); } catch (Exception e) { - logger.error("An exception occured in MessageAckListener.", e); + logger.error("An exception occurred in MessageAckListener.", e); } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 737ee14716..9a4ffb3705 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -41,6 +41,7 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.batch.BatchingStrategy; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils; import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry; @@ -134,6 +135,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT; + private boolean enforceImmediateAckForManual; + private volatile int concurrentConsumers = 1; private volatile Integer maxConcurrentConsumers; @@ -504,6 +507,18 @@ public void setConsumerStartTimeout(long consumerStartTimeout) { this.consumerStartTimeout = consumerStartTimeout; } + /** + * Set to {@code true} to enforce {@link Channel#basicAck(long, boolean)} + * for {@link org.springframework.amqp.core.AcknowledgeMode#MANUAL} + * when {@link ImmediateAcknowledgeAmqpException} is thrown. + * This might be a tentative solution to not break behavior for current minor version. + * @param enforceImmediateAckForManual the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException + * @since 3.1.2 + */ + public void setEnforceImmediateAckForManual(boolean enforceImmediateAckForManual) { + this.enforceImmediateAckForManual = enforceImmediateAckForManual; + } + /** * Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent * consumers. @@ -1012,6 +1027,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep List messages = null; long deliveryTag = 0; + boolean immediateAck = false; boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0; long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0; for (int i = 0; i < this.batchSize; i++) { @@ -1050,9 +1066,9 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep if (messages == null) { messages = new ArrayList<>(this.batchSize); } - if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) { - final List messageList = messages; - getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment)); + BatchingStrategy batchingStrategy = getBatchingStrategy(); + if (isDeBatchingEnabled() && batchingStrategy.canDebatch(message.getMessageProperties())) { + batchingStrategy.deBatch(message, messages::add); } else { messages.add(message); @@ -1073,6 +1089,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag()); } + immediateAck = this.enforceImmediateAckForManual; break; } catch (Exception ex) { @@ -1081,6 +1098,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep this.logger.debug("User requested ack for failed delivery: " + message.getMessageProperties().getDeliveryTag()); } + immediateAck = this.enforceImmediateAckForManual; break; } long tagToRollback = isAsyncReplies() @@ -1117,14 +1135,14 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep } } if (messages != null) { - executeWithList(channel, messages, deliveryTag, consumer); + immediateAck = executeWithList(channel, messages, deliveryTag, consumer); } - return consumer.commitIfNecessary(isChannelLocallyTransacted()); + return consumer.commitIfNecessary(isChannelLocallyTransacted(), immediateAck); } - private void executeWithList(Channel channel, List messages, long deliveryTag, + private boolean executeWithList(Channel channel, List messages, long deliveryTag, BlockingQueueConsumer consumer) { try { @@ -1136,7 +1154,7 @@ private void executeWithList(Channel channel, List messages, long deliv + e.getMessage() + "' (last in batch): " + deliveryTag); } - return; + return this.enforceImmediateAckForManual; } catch (Exception ex) { if (causeChainHasImmediateAcknowledgeAmqpException(ex)) { @@ -1144,7 +1162,7 @@ private void executeWithList(Channel channel, List messages, long deliv this.logger.debug("User requested ack for failed delivery (last in batch): " + deliveryTag); } - return; + return this.enforceImmediateAckForManual; } if (getTransactionManager() != null) { if (getTransactionAttribute().rollbackOn(ex)) { @@ -1173,6 +1191,7 @@ private void executeWithList(Channel channel, List messages, long deliv throw ex; } } + return false; } protected void handleStartupFailure(BackOffExecution backOffExecution) { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java index beb265e8d5..9eb3fe9902 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.amqp.rabbit.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,8 +28,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.amqp.ImmediateAcknowledgeAmqpException; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -45,6 +48,7 @@ * @author Dave Syer * @author Gunnar Hillert * @author Gary Russell + * @author Artem Bilan * * @since 1.0 * @@ -56,7 +60,7 @@ public class MessageListenerManualAckIntegrationTests { public static final String TEST_QUEUE = "test.queue.MessageListenerManualAckIntegrationTests"; - private static Log logger = LogFactory.getLog(MessageListenerManualAckIntegrationTests.class); + private static final Log logger = LogFactory.getLog(MessageListenerManualAckIntegrationTests.class); private final Queue queue = new Queue(TEST_QUEUE); @@ -121,6 +125,26 @@ public void testListenerWithManualAckTransactional() throws Exception { assertThat(template.receiveAndConvert(queue.getName())).isNull(); } + @Test + public void immediateIsAckedForManual() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + container = createContainer(new ImmediateTestListener(latch)); + container.setEnforceImmediateAckForManual(true); + + template.convertAndSend(queue.getName(), "test data"); + + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + + Channel channel = template.getConnectionFactory().createConnection().createChannel(false); + + await().untilAsserted(() -> assertThat(channel.consumerCount(queue.getName())).isEqualTo(0)); + assertThat(channel.messageCount(queue.getName())).isEqualTo(0); + + channel.close(); + } + private SimpleMessageListenerContainer createContainer(Object listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory()); container.setMessageListener(new MessageListenerAdapter(listener)); @@ -159,4 +183,23 @@ public void onMessage(Message message, Channel channel) throws Exception { } } + static class ImmediateTestListener implements MessageListener { + + private final CountDownLatch latch; + + ImmediateTestListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onMessage(Message message) { + try { + throw new ImmediateAcknowledgeAmqpException("intentional"); + } + finally { + this.latch.countDown(); + } + } + } + }