Skip to content

Commit

Permalink
GH-1189: @SendTo for @KafkaHandler after error is handled
Browse files Browse the repository at this point in the history
Sending the result from a `KafkaListenerErrorHandler` was broken
for `@KafkaHandler` because the send to expression
was lost.
  • Loading branch information
Zhiyang.Wang1 committed Jan 18, 2024
1 parent 89fc6d0 commit 9609ceb
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -337,6 +337,17 @@ private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadC
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {

InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
if (handler != null) {
return new InvocationResult(result, this.handlerSendTo.get(handler),
this.handlerReturnsMessage.get(handler));
}
return null;
}

private static final class PayloadValidator extends PayloadMethodArgumentResolver {

PayloadValidator(Validator validator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.listener.adapter;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

Expand Down Expand Up @@ -98,4 +99,13 @@ public Object getBean() {
}
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) {

if (this.delegatingHandler != null && inboundPayload != null) {
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -462,16 +463,14 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
String replyTopic = evaluateReplyTopic(request, source, resultArg);
Assert.state(replyTopic == null || this.replyTemplate != null,
"a KafkaTemplate is required to support replies");
Object result;
boolean messageReturnType;
if (resultArg instanceof InvocationResult invocationResult) {
result = invocationResult.getResult();
messageReturnType = invocationResult.isMessageReturnType();
}
else {
result = resultArg;
messageReturnType = this.messageReturnType;
}

Object result = resultArg instanceof InvocationResult invocationResult ?
invocationResult.getResult() :
resultArg;
boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ?
invocationResult.isMessageReturnType() :
this.messageReturnType;

if (result instanceof CompletableFuture<?> completable) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
Expand Down Expand Up @@ -677,9 +676,11 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle
if (NULL_MESSAGE.equals(message)) {
message = new GenericMessage<>(records);
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, acknowledgment, consumer, message);
Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (errorResult != null && !(errorResult instanceof InvocationResult)) {
Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload());
handleResult(Objects.requireNonNullElse(result, errorResult),
records, acknowledgment, consumer, message);
}
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,12 @@ public void testMulti() throws Exception {
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
assertThat(reply.value()).isEqualTo("OK");
consumer.close();

template.send("annotated8", 0, 1, "junk");
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
ConsumerRecord<Integer, String> reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
consumer.close();
assertThat(reply2.value()).isEqualTo("JUNK intentional");
assertThat(this.multiListener.meta).isNotNull();
}

Expand Down Expand Up @@ -1754,7 +1756,8 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
return (m, e) -> {
listener.errorLatch.countDown();
return null;
String payload = m.getPayload().toString().toUpperCase();
return payload + " " + e.getCause().getMessage();
};
}

Expand Down Expand Up @@ -2468,6 +2471,7 @@ static class MultiListenerBean {
volatile ConsumerRecordMetadata meta;

@KafkaHandler
@SendTo("annotated8reply")
public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
if ("junk".equals(bar)) {
throw new RuntimeException("intentional");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2"])
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"])
class EnableKafkaKotlinCoroutinesTests {

@Autowired
Expand Down Expand Up @@ -96,7 +96,7 @@ class EnableKafkaKotlinCoroutinesTests {
@Test
fun `test checkedKh reply`() {
this.template.send("kotlinAsyncTestTopic3", "foo")
val cr = this.template.receive("sendTopic1", 0, 0, Duration.ofSeconds(30))
val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30))
assertThat(cr.value()).isEqualTo("FOO")
}

Expand All @@ -105,7 +105,7 @@ class EnableKafkaKotlinCoroutinesTests {
class Listener {

@KafkaHandler
@SendTo("sendTopic1")
@SendTo("sendTopicReply1")
suspend fun handler1(value: String) : String {
return value.uppercase()
}
Expand Down

0 comments on commit 9609ceb

Please sign in to comment.