Skip to content

Commit

Permalink
spring-projectsGH-3695: Fix MessagingMessageListenerAdapter to not …
Browse files Browse the repository at this point in the history
…ack when sync

Fixes: spring-projects#3695

Even if th `@KafkaHandler` method is `void` the `DelegatingInvocableHandler` returns an empty `InvocationResult`.
That triggers a `MessagingMessageListenerAdapter.handleResult()` logic.
On the `completableFutureResult.whenComplete()` we call `acknowledge()` which is not expected for `void` POJO methods.

* Fix `MessagingMessageListenerAdapter` to check for `isAsyncReplies()` before calling `acknowledge()`

This is a regression after spring-projects#3528
  • Loading branch information
artembilan committed Dec 27, 2024
1 parent c9e7edc commit a78bda6
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
import org.springframework.util.TypeUtils;

/**
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
* An abstract {@link MessageListener} adapter
* providing the necessary infrastructure to extract the payload of a
* {@link org.springframework.messaging.Message}.
* {@link Message}.
*
* @param <K> the key type.
* @param <V> the value type.
Expand Down Expand Up @@ -205,9 +205,9 @@ public void setMessageConverter(RecordMessageConverter messageConverter) {

/**
* Return the {@link MessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
* being able to convert {@link Message}.
* @return the {@link MessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
* being able to convert {@link Message}.
*/
protected final RecordMessageConverter getMessageConverter() {
return this.messageConverter;
Expand Down Expand Up @@ -550,7 +550,9 @@ else if (!(result instanceof CompletableFuture<?>)) {
try {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
acknowledge(acknowledgment);
if (isAsyncReplies()) {
acknowledge(acknowledgment);
}
}
else {
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
Expand Down

0 comments on commit a78bda6

Please sign in to comment.