diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index a130bf488f..6d2b0ffc49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -104,7 +104,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; -import org.springframework.kafka.listener.adapter.HandlerMethodDetect; +import org.springframework.kafka.listener.adapter.AsyncRepliesAware; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -854,7 +854,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { - this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies() + this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies() || this.containerProperties.isAsyncAcks(); AckMode ackMode = determineAckMode(); this.isManualAck = ackMode.equals(AckMode.MANUAL); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java similarity index 90% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java index 0f90ffefdd..43ea7fda3c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java @@ -17,12 +17,12 @@ package org.springframework.kafka.listener.adapter; /** - * Auto-detect {@link HandlerAdapter} args and return type. + * Auto-detect {@link HandlerAdapter} return type. * * @author Wang zhiyang * @since 3.2 */ -public interface HandlerMethodDetect { +public interface AsyncRepliesAware { /** * Return true if this listener is request/reply and the replies are async. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java index 0d5378e317..d870ea5264 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -16,6 +16,12 @@ package org.springframework.kafka.listener.adapter; +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; + +import reactor.core.publisher.Mono; + /** * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. *

@@ -24,17 +30,9 @@ * but for regular {@link HandlerMethodArgumentResolver} contract. * * @author Wang Zhiyang - * * @since 3.2 - * * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver */ -import org.springframework.core.MethodParameter; -import org.springframework.messaging.Message; -import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; - -import reactor.core.publisher.Mono; - public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9b143ef277..f1d51c6771 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -91,7 +91,7 @@ * @author Nathan Xu * @author Wang ZhiYang */ -public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, HandlerMethodDetect { +public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { private static final SpelExpressionParser PARSER = new SpelExpressionParser();