diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 59440ccda..e4cca9cd9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -80,9 +80,9 @@ import org.springframework.util.TypeUtils; /** - * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter + * An abstract {@link MessageListener} adapter * providing the necessary infrastructure to extract the payload of a - * {@link org.springframework.messaging.Message}. + * {@link Message}. * * @param the key type. * @param the value type. @@ -205,9 +205,9 @@ public void setMessageConverter(RecordMessageConverter messageConverter) { /** * Return the {@link MessagingMessageConverter} for this listener, - * being able to convert {@link org.springframework.messaging.Message}. + * being able to convert {@link Message}. * @return the {@link MessagingMessageConverter} for this listener, - * being able to convert {@link org.springframework.messaging.Message}. + * being able to convert {@link Message}. */ protected final RecordMessageConverter getMessageConverter() { return this.messageConverter; @@ -550,7 +550,9 @@ else if (!(result instanceof CompletableFuture)) { try { if (t == null) { asyncSuccess(r, replyTopic, source, messageReturnType); - acknowledge(acknowledgment); + if (isAsyncReplies()) { + acknowledge(acknowledgment); + } } else { Throwable cause = t instanceof CompletionException ? t.getCause() : t;