From 76a2e0b6075091bd463c805c734d3f9571d7e348 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Tue, 30 Jan 2024 10:46:09 +0800 Subject: [PATCH] * polish `DelegatingInvocableHandler` and add javadoc * polish `HandlerAdapter` * change `InvocationResult` to record * Optimization `MessagingMessageListenerAdapter.asyncFailure` --- .../adapter/DelegatingInvocableHandler.java | 8 ++++- .../listener/adapter/HandlerAdapter.java | 1 - .../listener/adapter/InvocationResult.java | 33 +++---------------- .../MessagingMessageListenerAdapter.java | 14 ++++---- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 5d85dee5e1..9f94c33d54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -337,9 +337,15 @@ private boolean assignPayload(MethodParameter methodParameter, Class payloadC && methodParameter.getParameterType().isAssignableFrom(payloadClass); } + /** + * Return the result of a method invocation by providing a result and payload. + * @param result the result. + * @param inboundPayload the payload. + * @return the result of a method invocation. + * @since 3.2 + */ @Nullable public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) { - InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass()); if (handler != null) { return new InvocationResult(result, this.handlerSendTo.get(handler), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 0aac47f788..82caa738fd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -101,7 +101,6 @@ public Object getBean() { @Nullable public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) { - if (this.delegatingHandler != null && inboundPayload != null) { return this.delegatingHandler.getInvocationResultFor(result, inboundPayload); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java index 8c31633ea5..877315717a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java @@ -21,39 +21,14 @@ /** * The result of a method invocation. + * @param result the result. + * @param messageReturnType the message return type. + * @param sendTo the expression about sends topic. * * @author Gary Russell * @since 2.2 */ -public final class InvocationResult { - - @Nullable - private final Object result; - - @Nullable - private final Expression sendTo; - - private final boolean messageReturnType; - - public InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) { - this.result = result; - this.sendTo = sendTo; - this.messageReturnType = messageReturnType; - } - - @Nullable - public Object getResult() { - return this.result; - } - - @Nullable - public Expression getSendTo() { - return this.sendTo; - } - - public boolean isMessageReturnType() { - return this.messageReturnType; - } +public record InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) { @Override public String toString() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index f1d51c6771..370b8167a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -465,10 +465,10 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle "a KafkaTemplate is required to support replies"); Object result = resultArg instanceof InvocationResult invocationResult ? - invocationResult.getResult() : + invocationResult.result() : resultArg; boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ? - invocationResult.isMessageReturnType() : + invocationResult.messageReturnType() : this.messageReturnType; if (result instanceof CompletableFuture completable) { @@ -506,7 +506,7 @@ else if (monoPresent && result instanceof Mono mono) { private String evaluateReplyTopic(Object request, Object source, Object result) { String replyTo = null; if (result instanceof InvocationResult invResult) { - replyTo = evaluateTopic(request, source, result, invResult.getSendTo()); + replyTo = evaluateTopic(request, source, result, invResult.sendTo()); } else if (this.replyTopicExpression != null) { replyTo = evaluateTopic(request, source, result, this.replyTopicExpression); @@ -656,16 +656,16 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) { protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, Throwable t, Message source) { + try { handleException(request, acknowledgment, consumer, source, new ListenerExecutionFailedException(createMessagingErrorMessage( "Async Fail", source.getPayload()), t)); - return; } - catch (Exception ex) { + catch (Throwable ex) { + this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); + acknowledge(acknowledgment); } - this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); - acknowledge(acknowledgment); } protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer,