From aee58bd232ab1bdca5cc1321e54109cbd2b63b02 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sun, 24 Dec 2023 21:41:57 +0800 Subject: [PATCH] GH-1189: Auto detect async reply * auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc --- .../receiving-messages/async-returns.adoc | 2 +- .../KafkaMessageListenerContainer.java | 49 ++++--- .../kafka/listener/adapter/AdapterUtils.java | 22 ++- .../adapter/DelegatingInvocableHandler.java | 21 +++ .../listener/adapter/HandlerAdapter.java | 13 ++ .../listener/adapter/HandlerMethodDetect.java | 36 +++++ .../MessagingMessageListenerAdapter.java | 10 +- .../kafka/support/Acknowledgment.java | 2 +- .../kafka/annotation/AsyncListenerTests.java | 138 +++++++++++++++++- .../KafkaMessageListenerContainerTests.java | 3 +- 10 files changed, 258 insertions(+), 38 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc index 3421e0eb0d..651b2eb4b7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -23,7 +23,7 @@ public Mono listen(String data) { } ---- -IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes. +IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes. When the async result is completed with an error, whether the message is recover or not depends on the container error handler. If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index e236d1df63..62c9e5c1a8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -104,6 +104,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; +import org.springframework.kafka.listener.adapter.HandlerMethodDetect; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -159,6 +160,7 @@ * @author Tomaz Fernandes * @author Francois Rosiere * @author Daniel Gentes + * @author Wang Zhiyang */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -659,6 +661,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean wantsFullRecords; + private final boolean asyncReplies; + private final boolean autoCommit; private final boolean isManualAck; @@ -849,6 +853,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { + this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies() + || this.containerProperties.isAsyncAcks(); AckMode ackMode = determineAckMode(); this.isManualAck = ackMode.equals(AckMode.MANUAL); this.isCountAck = ackMode.equals(AckMode.COUNT) @@ -859,12 +865,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; this.isRecordAck = ackMode.equals(AckMode.RECORD); this.offsetsInThisBatch = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.deferredOffsets = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.observationRegistry = observationRegistry; @@ -903,8 +909,7 @@ else if (listener instanceof MessageListener) { else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " + "'BatchMessageListener', or the variants that are consumer aware and/or " - + "Acknowledging" - + " not " + listener.getClass().getName()); + + "Acknowledging not " + listener.getClass().getName()); } this.listenerType = listenerType; this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) @@ -927,18 +932,15 @@ else if (listener instanceof MessageListener) { this.logger.info(toString()); } ApplicationContext applicationContext = getApplicationContext(); + ClassLoader classLoader = applicationContext == null + ? getClass().getClassLoader() + : applicationContext.getClassLoader(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, false, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, false, classLoader); this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, true, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, true, classLoader); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property, so we can use it directly from code elsewhere @@ -964,6 +966,9 @@ private AckMode determineAckMode() { if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) { ackMode = AckMode.MANUAL; } + if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) { + ackMode = AckMode.MANUAL; + } return ackMode; } @@ -3388,15 +3393,15 @@ public void acknowledge() { public void nack(Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis(); } @Override - public boolean isAsyncAcks() { - return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; } @Override @@ -3473,8 +3478,8 @@ public void acknowledge(int index) { public void nack(int index, Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds"); ListenerConsumer.this.nackIndex = index; @@ -3498,8 +3503,8 @@ public void nack(int index, Duration sleep) { } @Override - public boolean isAsyncAcks() { - return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index b51909fa52..0be36782d8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.kafka.listener.adapter; +import java.util.concurrent.CompletableFuture; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -24,6 +26,9 @@ import org.springframework.expression.common.TemplateParserContext; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; + +import reactor.core.publisher.Mono; /** * Utilities for listener adapters. @@ -40,6 +45,9 @@ public final class AdapterUtils { */ public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); + private static final boolean MONO_PRESENT = + ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader()); + private AdapterUtils() { } @@ -86,4 +94,16 @@ public static String getDefaultReplyTopicExpression() { + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); } + static boolean isAsyncReply(Class resultType) { + return isMono(resultType) || isCompletableFuture(resultType); + } + + static boolean isMono(Class resultType) { + return MONO_PRESENT && Mono.class.isAssignableFrom(resultType); + } + + static boolean isCompletableFuture(Class resultType) { + return CompletableFuture.class.isAssignableFrom(resultType); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 44f0784b51..cae5ca0dea 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -55,6 +55,7 @@ * unambiguous. * * @author Gary Russell + * @author Wang Zhiyang * */ public class DelegatingInvocableHandler { @@ -86,6 +87,8 @@ public class DelegatingInvocableHandler { private final PayloadValidator validator; + private final boolean asyncReplies; + /** * Construct an instance with the supplied handlers for the bean. * @param handlers the handlers. @@ -116,6 +119,15 @@ public DelegatingInvocableHandler(List handlers, ? (ConfigurableListableBeanFactory) beanFactory : null; this.validator = validator == null ? null : new PayloadValidator(validator); + boolean asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler); + for (InvocableHandlerMethod handlerMethod : handlers) { + asyncReplies |= isAsyncReply(handlerMethod); + } + this.asyncReplies = asyncReplies; + } + + private boolean isAsyncReply(InvocableHandlerMethod method) { + return AdapterUtils.isAsyncReply(method.getMethod().getReturnType()); } private void checkSpecial(@Nullable InvocableHandlerMethod handler) { @@ -139,6 +151,15 @@ public Object getBean() { return this.bean; } + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; + } + /** * Invoke the method with the given message. * @param message the message. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 3103b8b221..7809cb2cbe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -33,6 +33,8 @@ public class HandlerAdapter { private final DelegatingInvocableHandler delegatingHandler; + private final boolean asyncReplies; + /** * Construct an instance with the provided method. * @param invokerHandlerMethod the method. @@ -40,6 +42,7 @@ public class HandlerAdapter { public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { this.invokerHandlerMethod = invokerHandlerMethod; this.delegatingHandler = null; + this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType()); } /** @@ -49,6 +52,16 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) { this.invokerHandlerMethod = null; this.delegatingHandler = delegatingHandler; + this.asyncReplies = delegatingHandler.isAsyncReplies(); + } + + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; } public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java new file mode 100644 index 0000000000..0f90ffefdd --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +/** + * Auto-detect {@link HandlerAdapter} args and return type. + * + * @author Wang zhiyang + * @since 3.2 + */ +public interface HandlerMethodDetect { + + /** + * Return true if this listener is request/reply and the replies are async. + * @return true for async replies. + * @since 3.2 + */ + default boolean isAsyncReplies() { + return false; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 2c9bcdf383..552ba51251 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -90,7 +90,7 @@ * @author Nathan Xu * @author Wang ZhiYang */ -public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware { +public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, HandlerMethodDetect { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); @@ -243,6 +243,10 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) { this.handlerMethod = handlerMethod; } + public boolean isAsyncReplies() { + return this.handlerMethod.isAsyncReplies(); + } + protected boolean isConsumerRecordList() { return this.isConsumerRecordList; } @@ -469,7 +473,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle messageReturnType = this.messageReturnType; } if (result instanceof CompletableFuture completable) { - if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + "otherwise the container will ack the message immediately"); } @@ -484,7 +488,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle }); } else if (monoPresent && result instanceof Mono mono) { - if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 63278b4a70..47217d53f1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -81,7 +81,7 @@ default void nack(int index, Duration sleep) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); } - default boolean isAsyncAcks() { + default boolean isOutOfOrderCommit() { return false; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java index e8e1b5a972..24d668737e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -37,6 +37,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -51,7 +52,9 @@ @EmbeddedKafka(topics = { AsyncListenerTests.FUTURE_TOPIC_1, AsyncListenerTests.FUTURE_TOPIC_BATCH_1, AsyncListenerTests.MONO_TOPIC_1, AsyncListenerTests.MONO_TOPIC_BATCH_1, - AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) + AsyncListenerTests.AUTO_DETECT_ASYNC_FUTURE, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_FUTURE, + AsyncListenerTests.AUTO_DETECT_ASYNC_MONO, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_MONO, + AsyncListenerTests.AUTO_DETECT_ASYNC_KAFKA_HANDLER, AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) public class AsyncListenerTests { static final String FUTURE_TOPIC_1 = "future-topic-1"; @@ -64,12 +67,28 @@ public class AsyncListenerTests { static final String SEND_TOPIC_1 = "send-topic-1"; + static final String AUTO_DETECT_ASYNC_FUTURE = "auto-detect-async-future"; + + static final String AUTO_DETECT_ASYNC_BATCH_FUTURE = "auto-detect-async-batch-future"; + + static final String AUTO_DETECT_ASYNC_MONO = "auto-detect-async-mono"; + + static final String AUTO_DETECT_ASYNC_BATCH_MONO = "auto-detect-async-batch-mono"; + + static final String AUTO_DETECT_ASYNC_KAFKA_HANDLER = "auto-detect-async-kafka-handler"; + @Autowired private KafkaTemplate kafkaTemplate; @Autowired private Config config; + @Autowired + private Listener listener; + + @Autowired + MultiMethodListener multiMethodListener; + @Test public void testAsyncListener() throws Exception { @@ -85,7 +104,7 @@ public void testAsyncListener() throws Exception { assertThat(cr1.value()).isEqualTo("1"); kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "bar-2"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 3); - assertThat(cr1.value()).isEqualTo("bar-2_beh"); + assertThat(cr1.value()).isEqualTo("[bar-2]_beh"); kafkaTemplate.send(MONO_TOPIC_1, "foo-3"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 4); @@ -95,14 +114,32 @@ public void testAsyncListener() throws Exception { cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 5); assertThat(cr1.value()).isEqualTo("bar-3_eh"); - kafkaTemplate.send(MONO_TOPIC_BATCH_1, "foo-4"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 6); assertThat(cr1.value()).isEqualTo("1"); kafkaTemplate.send(MONO_TOPIC_BATCH_1, "bar-4"); assertThat(config.latch2.await(10, TimeUnit.SECONDS)).isEqualTo(true); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 7); - assertThat(cr1.value()).isEqualTo("bar-4_beh"); + assertThat(cr1.value()).isEqualTo("[bar-4]_beh"); + } + + @Test + public void testAsyncAcks() throws Exception { + + kafkaTemplate.send(AUTO_DETECT_ASYNC_FUTURE, "baz-future"); + assertThat(this.listener.autoDetectFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_FUTURE, "baz-batch-future"); + assertThat(this.listener.autoDetectBatchFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_MONO, "baz-mono"); + assertThat(this.listener.autoDetectMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_MONO, "baz-batch-mono"); + assertThat(this.listener.autoDetectBatchMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_KAFKA_HANDLER, "foo-multi-async"); + assertThat(this.multiMethodListener.handler1.await(10, TimeUnit.SECONDS)).isTrue(); } public static class Listener { @@ -115,7 +152,16 @@ public static class Listener { private final AtomicBoolean monoBatch1 = new AtomicBoolean(true); - @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler") + public final CountDownLatch autoDetectFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectMono = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchMono = new CountDownLatch(1); + + @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") @SendTo(SEND_TOPIC_1) public CompletableFuture listen1(String foo) { CompletableFuture future = new CompletableFuture<>(); @@ -128,7 +174,8 @@ public CompletableFuture listen1(String foo) { return future; } - @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") @SendTo(SEND_TOPIC_1) public CompletableFuture listen2(List foo) { CompletableFuture future = new CompletableFuture<>(); @@ -141,7 +188,8 @@ public CompletableFuture listen2(List foo) { return future; } - @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler") + @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") @SendTo(SEND_TOPIC_1) public Mono listen3(String bar) { if (mono1.getAndSet(false)) { @@ -152,7 +200,8 @@ public Mono listen3(String bar) { } } - @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") @SendTo(SEND_TOPIC_1) public Mono listen4(List bar) { if (monoBatch1.getAndSet(false)) { @@ -163,6 +212,74 @@ public Mono listen4(List bar) { } } + @KafkaListener(id = "autoDetectFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaListenerContainerFactory") + public CompletableFuture listen5(String baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(baz.toUpperCase()); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectBatchFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaBatchListenerContainerFactory") + public CompletableFuture listen6(List baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(String.valueOf(baz.size())); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectMono", topics = AUTO_DETECT_ASYNC_MONO, + containerFactory = "kafkaListenerContainerFactory") + public Mono listen7(String qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectMono.countDown(); + } + return Mono.just(qux.toUpperCase()); + } + + @KafkaListener(id = "autoDetectBatchMono", topics = AUTO_DETECT_ASYNC_BATCH_MONO, + containerFactory = "kafkaBatchListenerContainerFactory") + public Mono listen8(List qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchMono.countDown(); + } + return Mono.just(String.valueOf(qux.size())); + } + + } + + @KafkaListener(id = "autoDetectKafkaHandler", topics = AUTO_DETECT_ASYNC_KAFKA_HANDLER, + containerFactory = "kafkaListenerContainerFactory") + public static class MultiMethodListener { + + public final CountDownLatch handler1 = new CountDownLatch(1); + + @KafkaHandler + public Mono handler1(String foo, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + handler1.countDown(); + } + return Mono.just(foo); + } + + @KafkaHandler + public CompletableFuture handler2(Integer bar) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(bar); + return future; + } + + @KafkaHandler(isDefault = true) + public void handler2(Short baz, Acknowledgment acknowledgment) { + acknowledgment.acknowledge(); + } + } @Configuration @@ -181,6 +298,11 @@ public Listener listener() { return new Listener(); } + @Bean + public MultiMethodListener multiMethodListener() { + return new MultiMethodListener(); + } + @Bean public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { KafkaTemplate template = new KafkaTemplate<>(producerFactory(embeddedKafka)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 88899ca37e..238599f6a0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -235,8 +235,7 @@ public void testDelegateType() throws Exception { container.setBeanName("delegate"); AtomicReference> offsets = new AtomicReference<>(); container.setApplicationEventPublisher(e -> { - if (e instanceof ConsumerStoppingEvent) { - ConsumerStoppingEvent event = (ConsumerStoppingEvent) e; + if (e instanceof ConsumerStoppingEvent event) { offsets.set(event.getPartitions().stream() .map(p -> new TopicPartitionOffset(p.topic(), p.partition(), event.getConsumer().position(p, Duration.ofMillis(10_000))))