From 2e59574d939cb427f09a02cc10f33e2ca627d74f Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sat, 16 Dec 2023 20:40:57 +0800 Subject: [PATCH 1/8] Refactor `MessagingMessageListenerAdapter` * move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter` * move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter` * add `@Nullable` to `KafkaListenerErrorHandler` --- .../listener/KafkaListenerErrorHandler.java | 3 +- .../BatchMessagingMessageListenerAdapter.java | 41 +-------------- .../MessagingMessageListenerAdapter.java | 52 +++++++++++++++++++ ...RecordMessagingMessageListenerAdapter.java | 35 +------------ 4 files changed, 58 insertions(+), 73 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java index 91715b1710..36536a9fcb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ default Object handleError(Message message, ListenerExecutionFailedException * @return the return value is ignored unless the annotated method has a * {@code @SendTo} annotation. */ + @Nullable default Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer, @Nullable Acknowledgment ack) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index a2c1d87704..b7241c0fd3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -26,14 +26,12 @@ import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.KafkaListenerErrorHandler; -import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -56,6 +54,7 @@ * @author Gary Russell * @author Artem Bilan * @author Venil Noronha + * @author Wang ZhiYang * @since 1.1 */ public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter @@ -63,8 +62,6 @@ public class BatchMessagingMessageListenerAdapter extends MessagingMessage private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); - private final KafkaListenerErrorHandler errorHandler; - private BatchToRecordAdapter batchToRecordAdapter; /** @@ -85,8 +82,7 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method) { public BatchMessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { - super(bean, method); - this.errorHandler = errorHandler; + super(bean, method, errorHandler); } /** @@ -172,39 +168,6 @@ public void onMessage(List> records, @Nullable Acknowledgme invoke(records, acknowledgment, consumer, message); } - protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, - final Message messageArg) { - - Message message = messageArg; - try { - Object result = invokeHandler(records, acknowledgment, message, consumer); - if (result != null) { - handleResult(result, records, message); - } - } - catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control - if (this.errorHandler != null) { - try { - if (message.equals(NULL_MESSAGE)) { - message = new GenericMessage<>(records); - } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, records, message); - } - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss - "Listener error handler threw an exception for the incoming message", - message.getPayload()), ex); - } - } - else { - throw e; - } - } - } - @SuppressWarnings({ "unchecked", "rawtypes" }) protected Message toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment, Consumer consumer) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 0b9b7097fc..4bcbda9016 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -45,6 +45,7 @@ import org.springframework.expression.spel.support.StandardTypeConverter; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerSeekAware; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; @@ -102,6 +103,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); + private final KafkaListenerErrorHandler errorHandler; + private HandlerAdapter handlerMethod; private boolean isConsumerRecordList; @@ -143,8 +146,19 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS * @param method the method. */ protected MessagingMessageListenerAdapter(Object bean, Method method) { + this(bean, method, null); + } + + /** + * Create an instance with the provided bean, method and kafka listener error handler. + * @param bean the bean. + * @param method the method. + * @param errorHandler the kafka listener error handler. + */ + protected MessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { this.bean = bean; this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final + this.errorHandler = errorHandler; } /** @@ -348,6 +362,20 @@ protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } + protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + final Message message) { + + try { + Object result = invokeHandler(records, acknowledgment, message, consumer); + if (result != null) { + handleResult(result, records, message); + } + } + catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control + handleException(records, acknowledgment, consumer, message, e); + } + } + /** * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException} * with a dedicated error message. @@ -558,6 +586,30 @@ private void sendReplyForMessageSource(Object result, String topic, Message s this.replyTemplate.send(builder.build()); } + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + Message message, ListenerExecutionFailedException e) { + + if (this.errorHandler != null) { + try { + if (NULL_MESSAGE.equals(message)) { + message = new GenericMessage<>(records); + } + Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); + if (result != null) { + handleResult(result, records, message); + } + } + catch (Exception ex) { + throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss + "Listener error handler threw an exception for the incoming message", + message.getPayload()), ex); + } + } + else { + throw e; + } + } + private void setCorrelation(MessageBuilder builder, Message source) { byte[] correlationBytes = getCorrelation(source); if (correlationBytes != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index c0da30c278..ee9b791009 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -23,12 +23,10 @@ import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.KafkaListenerErrorHandler; -import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.ProjectingMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; /** @@ -53,8 +51,6 @@ public class RecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter implements AcknowledgingConsumerAwareMessageListener { - private final KafkaListenerErrorHandler errorHandler; - public RecordMessagingMessageListenerAdapter(Object bean, Method method) { this(bean, method, null); } @@ -62,8 +58,7 @@ public RecordMessagingMessageListenerAdapter(Object bean, Method method) { public RecordMessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { - super(bean, method); - this.errorHandler = errorHandler; + super(bean, method, errorHandler); } /** @@ -88,33 +83,7 @@ public void onMessage(ConsumerRecord record, @Nullable Acknowledgment ackn if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) { this.logger.debug("Processing [" + message + "]"); } - try { - Object result = invokeHandler(record, acknowledgment, message, consumer); - if (result != null) { - handleResult(result, record, message); - } - } - catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control - if (this.errorHandler != null) { - try { - if (message.equals(NULL_MESSAGE)) { - message = new GenericMessage<>(record); - } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, record, message); - } - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss - "Listener error handler threw an exception for the incoming message", - message.getPayload()), ex); - } - } - else { - throw e; - } - } + invoke(record, acknowledgment, consumer, message); } } From b44b4fb5a8edba8b2fb79e9113ea30dc8338e7e4 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sat, 16 Dec 2023 20:40:57 +0800 Subject: [PATCH 2/8] GH-1189: support `Mono` and `Future` * Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc` --- .../receiving-messages/async-returns.adoc | 31 +++ .../antora/modules/ROOT/pages/whats-new.adoc | 8 +- .../config/MethodKafkaListenerEndpoint.java | 4 +- .../KafkaMessageListenerContainer.java | 10 + .../MessagingMessageListenerAdapter.java | 92 ++++++- .../kafka/support/Acknowledgment.java | 4 + .../kafka/annotation/AsyncListenerTests.java | 249 ++++++++++++++++++ ...hMessagingMessageListenerAdapterTests.java | 94 ++++++- .../MessagingMessageListenerAdapterTests.java | 52 +++- 9 files changed, 527 insertions(+), 17 deletions(-) create mode 100644 spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc new file mode 100644 index 0000000000..658e24d497 --- /dev/null +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -0,0 +1,31 @@ +[[async-returns]] += Asynchronous `@KafkaListener` Return Types + +`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture` and `Mono`, letting the reply be sent asynchronously. + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public CompletableFuture listen(String data) { + ... + CompletableFuture future = new CompletableFuture<>(); + future.complete("done"); + return future; +} +---- + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public Mono listen(String data) { + ... + return Mono.empty(); +} +---- + +IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes. +When the async result is completed with an error, whether the message is recover or not depends on the container error handler. +If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. + +If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure. +See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 3601aca797..bb43911f48 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix. The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. -See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. \ No newline at end of file +See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. + +[[x32-async-return]] +=== Async @KafkaListener Return + +`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture` and `Mono`. +See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 28e190674e..5e27af27cd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -147,8 +147,8 @@ private String getReplyTopic() { } String topic = destinations.length == 1 ? destinations[0] : ""; BeanFactory beanFactory = getBeanFactory(); - if (beanFactory instanceof ConfigurableListableBeanFactory) { - topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic); + if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) { + topic = configurableListableBeanFactory.resolveEmbeddedValue(topic); if (topic != null) { topic = resolve(topic); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bd0523c1c5..d34970fb23 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3395,6 +3395,11 @@ public void nack(Duration sleep) { ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis(); } + @Override + public boolean isAsyncAcks() { + return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + } + @Override public String toString() { return "Acknowledgment for " + KafkaUtils.format(this.cRecord); @@ -3493,6 +3498,11 @@ public void nack(int index, Duration sleep) { processAcks(new ConsumerRecords(newRecords)); } + @Override + public boolean isAsyncAcks() { + return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + } + @Override public String toString() { return "Acknowledgment for " + this.records; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 4bcbda9016..6b354eea7a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; @@ -47,6 +48,7 @@ import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; @@ -66,9 +68,12 @@ import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + /** * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter * providing the necessary infrastructure to extract the payload of a @@ -95,6 +100,9 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS */ protected static final Message NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR + private static final boolean monoPresent = + ClassUtils.isPresent("reactor.core.publisher.Mono", MessageListener.class.getClassLoader()); + private final Object bean; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR @@ -368,7 +376,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C try { Object result = invokeHandler(records, acknowledgment, message, consumer); if (result != null) { - handleResult(result, records, message); + handleResult(result, records, acknowledgment, consumer, message); } } catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control @@ -436,19 +444,58 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me * response message to the SendTo topic. * @param resultArg the result object to handle (never null) * @param request the original request message + * @param acknowledgment the acknowledgment to manual ack + * @param consumer the consumer to handler error * @param source the source data for the method invocation - e.g. * {@code o.s.messaging.Message}; may be null */ - protected void handleResult(Object resultArg, Object request, Object source) { + protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, + Consumer consumer, @Nullable Message source) { + this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it"); - boolean isInvocationResult = resultArg instanceof InvocationResult; - Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg; String replyTopic = evaluateReplyTopic(request, source, resultArg); Assert.state(replyTopic == null || this.replyTemplate != null, "a KafkaTemplate is required to support replies"); - sendResponse(result, replyTopic, source, isInvocationResult - ? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType); + Object result; + boolean messageReturnType; + if (resultArg instanceof InvocationResult invocationResult) { + result = invocationResult.getResult(); + messageReturnType = invocationResult.isMessageReturnType(); + } + else { + result = resultArg; + messageReturnType = this.messageReturnType; + } + if (result instanceof CompletableFuture completable) { + if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + + "otherwise the container will ack the message immediately"); + } + completable.whenComplete((r, t) -> { + if (t == null) { + asyncSuccess(r, replyTopic, source, messageReturnType); + acknowledge(acknowledgment); + } + else { + asyncFailure(request, acknowledgment, consumer, t, source); + } + }); + } + else if (monoPresent && result instanceof Mono mono) { + if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type; " + + "otherwise the container will ack the message immediately"); + } + mono.subscribe( + r -> asyncSuccess(r, replyTopic, source, messageReturnType), + t -> asyncFailure(request, acknowledgment, consumer, t, source), + () -> acknowledge(acknowledgment) + ); + } + else { + sendResponse(result, replyTopic, source, messageReturnType); + } } @Nullable @@ -586,6 +633,37 @@ private void sendReplyForMessageSource(Object result, String topic, Message s this.replyTemplate.send(builder.build()); } + protected void asyncSuccess(@Nullable Object result, String replyTopic, Message source, boolean returnTypeMessage) { + if (result == null) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Async result is null, ignoring"); + } + } + else { + sendResponse(result, replyTopic, source, returnTypeMessage); + } + } + + protected void acknowledge(@Nullable Acknowledgment acknowledgment) { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + } + } + + protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, + Throwable t, Message source) { + try { + handleException(request, acknowledgment, consumer, source, + new ListenerExecutionFailedException(createMessagingErrorMessage( + "Async Fail", source.getPayload()), t)); + return; + } + catch (Exception ex) { + } + this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source); + acknowledge(acknowledgment); + } + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Message message, ListenerExecutionFailedException e) { @@ -596,7 +674,7 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle } Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); if (result != null) { - handleResult(result, records, message); + handleResult(result, records, acknowledgment, consumer, message); } } catch (Exception ex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 08c015fb4b..63278b4a70 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -81,4 +81,8 @@ default void nack(int index, Duration sleep) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); } + default boolean isAsyncAcks() { + return false; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java new file mode 100644 index 0000000000..e8e1b5a972 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -0,0 +1,249 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Mono; + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = { + AsyncListenerTests.FUTURE_TOPIC_1, AsyncListenerTests.FUTURE_TOPIC_BATCH_1, + AsyncListenerTests.MONO_TOPIC_1, AsyncListenerTests.MONO_TOPIC_BATCH_1, + AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) +public class AsyncListenerTests { + + static final String FUTURE_TOPIC_1 = "future-topic-1"; + + static final String FUTURE_TOPIC_BATCH_1 = "future-topic-batch-1"; + + static final String MONO_TOPIC_1 = "mono-topic-1"; + + static final String MONO_TOPIC_BATCH_1 = "mono-topic-batch-1"; + + static final String SEND_TOPIC_1 = "send-topic-1"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private Config config; + + @Test + public void testAsyncListener() throws Exception { + + kafkaTemplate.send(FUTURE_TOPIC_1, "foo-1"); + ConsumerRecord cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 0); + assertThat(cr1.value()).isEqualTo("FOO-1"); + kafkaTemplate.send(FUTURE_TOPIC_1, "bar-1"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 1); + assertThat(cr1.value()).isEqualTo("bar-1_eh"); + + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "foo-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 2); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "bar-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 3); + assertThat(cr1.value()).isEqualTo("bar-2_beh"); + + kafkaTemplate.send(MONO_TOPIC_1, "foo-3"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 4); + assertThat(cr1.value()).isEqualTo("FOO-3"); + kafkaTemplate.send(MONO_TOPIC_1, "bar-3"); + assertThat(config.latch1.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 5); + assertThat(cr1.value()).isEqualTo("bar-3_eh"); + + + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "foo-4"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 6); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "bar-4"); + assertThat(config.latch2.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 7); + assertThat(cr1.value()).isEqualTo("bar-4_beh"); + } + + public static class Listener { + + private final AtomicBoolean future1 = new AtomicBoolean(true); + + private final AtomicBoolean futureBatch1 = new AtomicBoolean(true); + + private final AtomicBoolean mono1 = new AtomicBoolean(true); + + private final AtomicBoolean monoBatch1 = new AtomicBoolean(true); + + @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen1(String foo) { + CompletableFuture future = new CompletableFuture<>(); + if (future1.getAndSet(false)) { + future.complete(foo.toUpperCase()); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception()")); + } + return future; + } + + @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen2(List foo) { + CompletableFuture future = new CompletableFuture<>(); + if (futureBatch1.getAndSet(false)) { + future.complete(String.valueOf(foo.size())); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception(batch)")); + } + return future; + } + + @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler") + @SendTo(SEND_TOPIC_1) + public Mono listen3(String bar) { + if (mono1.getAndSet(false)) { + return Mono.just(bar.toUpperCase()); + } + else { + return Mono.error(new RuntimeException("Mono.error()")); + } + } + + @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @SendTo(SEND_TOPIC_1) + public Mono listen4(List bar) { + if (monoBatch1.getAndSet(false)) { + return Mono.just(String.valueOf(bar.size())); + } + else { + return Mono.error(new RuntimeException("Mono.error(batch)")); + } + } + + } + + @Configuration + @EnableKafka + public static class Config { + + private final CountDownLatch latch1 = new CountDownLatch(2); + + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Bean + public Listener listener() { + return new Listener(); + } + + @Bean + public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { + KafkaTemplate template = new KafkaTemplate<>(producerFactory(embeddedKafka)); + template.setConsumerFactory(consumerFactory(embeddedKafka)); + return template; + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka)); + } + + @Bean + public Map producerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.producerProps(embeddedKafka); + } + + @Bean + public KafkaListenerErrorHandler errorHandler() { + return (message, exception) -> { + latch1.countDown(); + return message.getPayload() + "_eh"; + }; + } + + @Bean + public KafkaListenerErrorHandler errorBatchHandler() { + return (message, exception) -> { + latch2.countDown(); + return message.getPayload() + "_beh"; + }; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory( + EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka)); + } + + @Bean + public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.consumerProps("test", "false", embeddedKafka); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setBatchListener(true); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index d19d1bca26..2ece2495b5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,17 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -39,6 +46,8 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.publisher.Mono; + /** * @author Gary Russell * @since 2.2.5 @@ -60,6 +69,41 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr assertThat(foo.group).isEqualTo("test.group"); } + @SuppressWarnings("unchecked") + @Test + public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Bar bar) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("bar").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.future"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("bar", 0, 0L, null, "future_1")); + list.add(new ConsumerRecord<>("bar", 0, 1L, null, "future_2")); + list.add(new ConsumerRecord<>("bar", 1, 0L, null, "future_3")); + adapter.onMessage(list, null, null); + assertThat(bar.group).isEqualTo("test.group.future"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Baz baz) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("baz").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.mono"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("baz", 0, 0L, null, "mono_1")); + list.add(new ConsumerRecord<>("baz", 0, 1L, null, "mono_2")); + adapter.onMessage(list, null, null); + assertThat(baz.group).isEqualTo("test.group.mono"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + public static class Foo { public volatile String value = "someValue"; @@ -68,10 +112,38 @@ public static class Foo { @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { - list.forEach(s -> { - this.value = s; - }); + list.forEach(s -> this.value = s); + this.group = groupId; + } + + } + + public static class Bar { + + public volatile String group; + + @KafkaListener(id = "bar", topics = "bar", autoStartup = "false") + public CompletableFuture listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + + this.group = groupId; + CompletableFuture future = new CompletableFuture<>(); + future.complete("processed: " + list.size()); + return future; + } + + } + + public static class Baz { + + public volatile String value = "someValue"; + + public volatile String group; + + @KafkaListener(id = "baz", topics = "baz", autoStartup = "false") + public Mono listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + this.group = groupId; + return Mono.just(list.size()); } } @@ -85,11 +157,20 @@ public Foo foo() { return new Foo(); } + @Bean + public Bar bar() { + return new Bar(); + } + + @Bean + public Baz baz() { + return new Baz(); + } + @SuppressWarnings({ "rawtypes" }) @Bean public ConsumerFactory consumerFactory() { - ConsumerFactory consumerFactory = mock(ConsumerFactory.class); - return consumerFactory; + return mock(ConsumerFactory.class); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -100,6 +181,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont factory.setBatchListener(true); return factory; } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index a20468b7b1..d8ed544380 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,16 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.lang.reflect.Method; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -33,6 +38,8 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.support.GenericMessage; +import reactor.core.publisher.Mono; + /** * @author Gary Russell * @since 1.1.2 @@ -66,6 +73,37 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknow verify(converter).toMessage(cr, ack, null, String.class); } + @Test + public void testCompletableFutureReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("future", String.class, Acknowledgment.class); + testAsyncResult(method, "bar"); + } + + @Test + public void testMonoReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("mono", String.class, Acknowledgment.class); + testAsyncResult(method, "baz"); + } + + private void testAsyncResult(Method method, String topic) { + + KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); + RecordMessagingMessageListenerAdapter adapter = + spy(new RecordMessagingMessageListenerAdapter<>(this, method)); + adapter.setHandlerMethod( + new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method))); + ConsumerRecord cr = new ConsumerRecord<>(topic, 0, 0L, null, "foo"); + Acknowledgment ack = mock(Acknowledgment.class); + RecordMessageConverter converter = mock(RecordMessageConverter.class); + willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class); + adapter.setMessageConverter(converter); + adapter.onMessage(cr, ack, null); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + @Test void testMissingAck() throws NoSuchMethodException, SecurityException { KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); @@ -84,4 +122,16 @@ public void test(Acknowledgment ack) { } + public CompletableFuture future(String data, Acknowledgment ack) { + + CompletableFuture future = new CompletableFuture<>(); + future.complete("processed" + data); + return future; + } + + public Mono mono(String data, Acknowledgment ack) { + + return Mono.just(data); + } + } From 7477def80a98a0eb03535408767c594514a2dfcd Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Tue, 19 Dec 2023 11:13:56 +0800 Subject: [PATCH 3/8] GH-1189: support kotlin suspend * Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3` --- build.gradle | 2 + .../receiving-messages/async-returns.adoc | 2 +- ...inuationHandlerMethodArgumentResolver.java | 50 ++++ ...kaListenerAnnotationBeanPostProcessor.java | 4 +- .../KafkaMessageHandlerMethodFactory.java | 78 ++++++ .../KotlinAwareInvocableHandlerMethod.java | 49 ++++ .../MessagingMessageListenerAdapter.java | 11 +- .../EnableKafkaKotlinCoroutinesTests.kt | 241 ++++++++++++++++++ 8 files changed, 429 insertions(+), 8 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java create mode 100644 spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt diff --git a/build.gradle b/build.gradle index 3dea27d09d..842c53754c 100644 --- a/build.gradle +++ b/build.gradle @@ -60,6 +60,7 @@ ext { junit4Version = '4.13.2' junitJupiterVersion = '5.10.1' kafkaVersion = '3.6.1' + kotlinCoroutinesVersion = '1.7.3' log4jVersion = '2.22.1' micrometerDocsVersion = '1.0.2' micrometerVersion = '1.13.0-SNAPSHOT' @@ -278,6 +279,7 @@ project ('spring-kafka') { } api "org.apache.kafka:kafka-clients:$kafkaVersion" optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion" + optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion" optionalApi 'com.fasterxml.jackson.core:jackson-core' optionalApi 'com.fasterxml.jackson.core:jackson-databind' optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc index 658e24d497..3421e0eb0d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -27,5 +27,5 @@ IMPORTANT: The listener container factory must be configured with manual ack mod When the async result is completed with an error, whether the message is recover or not depends on the container error handler. If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. -If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure. +If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure. See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java new file mode 100644 index 0000000000..1622159a7d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +/** + * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. + *

+ * This class is similar to + * {@link org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver} + * but for regular {@link HandlerMethodArgumentResolver} contract. + * + * @author Wang Zhiyang + * + * @since 3.1 + * + * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver + */ +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; + +import reactor.core.publisher.Mono; + +public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName()); + } + + @Override + public Object resolveArgument(MethodParameter parameter, Message message) { + return Mono.empty(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index d7e68a7709..8862220862 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -1155,7 +1155,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() { } private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { - DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); + DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory(); Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator(); if (validator != null) { defaultFactory.setValidator(validator); @@ -1170,8 +1170,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { List customArgumentsResolver = new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers()); - // Has to be at the end - look at PayloadMethodArgumentResolver documentation - customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator)); defaultFactory.setCustomArgumentResolvers(customArgumentsResolver); defaultFactory.afterPropertiesSet(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java new file mode 100644 index 0000000000..794e1f9705 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.lang.reflect.Method; +import java.util.List; + +import org.springframework.core.KotlinDetector; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; +import org.springframework.validation.Validator; + +/** + * Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Kafka requirements. + * + * @author Wang Zhiyang + * + * @since 3.1 + */ +public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory { + + private final HandlerMethodArgumentResolverComposite argumentResolvers = + new HandlerMethodArgumentResolverComposite(); + + private MessageConverter messageConverter; + + private Validator validator; + + @Override + public void setMessageConverter(MessageConverter messageConverter) { + super.setMessageConverter(messageConverter); + this.messageConverter = messageConverter; + } + + @Override + public void setValidator(Validator validator) { + super.setValidator(validator); + this.validator = validator; + } + + @Override + protected List initArgumentResolvers() { + List resolvers = super.initArgumentResolvers(); + if (KotlinDetector.isKotlinPresent()) { + // Insert before PayloadMethodArgumentResolver + resolvers.add(resolvers.size() - 1, new ContinuationHandlerMethodArgumentResolver()); + } + // Has to be at the end - look at PayloadMethodArgumentResolver documentation + resolvers.add(resolvers.size() - 1, new KafkaNullAwarePayloadArgumentResolver(this.messageConverter, this.validator)); + this.argumentResolvers.addResolvers(resolvers); + return resolvers; + } + + @Override + public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { + InvocableHandlerMethod handlerMethod = new KotlinAwareInvocableHandlerMethod(bean, method); + handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); + return handlerMethod; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java new file mode 100644 index 0000000000..b5d26cd737 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.lang.reflect.Method; + +import org.springframework.core.CoroutinesUtils; +import org.springframework.core.KotlinDetector; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +/** + * An {@link InvocableHandlerMethod} extension for supporting Kotlin {@code suspend} function. + * + * @author Wang Zhiyang + * + * @since 3.1 + */ +public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod { + + public KotlinAwareInvocableHandlerMethod(Object bean, Method method) { + super(bean, method); + } + + @Override + protected Object doInvoke(Object... args) throws Exception { + Method method = getBridgedMethod(); + if (KotlinDetector.isSuspendingFunction(method)) { + return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args); + } + else { + return super.doInvoke(args); + } + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 6b354eea7a..2c9bcdf383 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.context.expression.MapAccessor; +import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; @@ -484,8 +485,8 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle } else if (monoPresent && result instanceof Mono mono) { if (acknowledgment == null || acknowledgment.isAsyncAcks()) { - this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type; " + - "otherwise the container will ack the message immediately"); + this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); } mono.subscribe( r -> asyncSuccess(r, replyTopic, source, messageReturnType), @@ -660,7 +661,7 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm } catch (Exception ex) { } - this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source); + this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); acknowledge(acknowledgment); } @@ -751,6 +752,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity isNotConvertible |= isAck; boolean isConsumer = parameterIsType(parameterType, Consumer.class); isNotConvertible |= isConsumer; + boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType()); + isNotConvertible |= isCoroutines; boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class); this.hasMetadataParameter |= isMeta; isNotConvertible |= isMeta; @@ -769,7 +772,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity break; } } - else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) { + else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) { allowedBatchParameters++; } } diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt new file mode 100644 index 0000000000..a312d374cc --- /dev/null +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -0,0 +1,241 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaHandler +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.listener.KafkaListenerErrorHandler +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.messaging.handler.annotation.SendTo +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import java.lang.Exception +import java.time.Duration +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + + +/** + * Kotlin Annotated async return listener tests. + * + * @author Wang ZhiYang + * + * @since 3.1 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2"]) +class EnableKafkaKotlinCoroutinesTests { + + @Autowired + private lateinit var config: Config + + @Autowired + private lateinit var template: KafkaTemplate + + @Test + fun `test listener`() { + this.template.send("kotlinAsyncTestTopic1", "foo") + assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.received).isEqualTo("foo") + } + + @Test + fun `test checkedEx`() { + this.template.send("kotlinAsyncTestTopic2", "fail") + assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.error).isTrue() + } + + @Test + fun `test batch listener`() { + this.template.send("kotlinAsyncBatchTestTopic1", "foo") + assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.batchReceived).isEqualTo("foo") + } + + @Test + fun `test batch checkedEx`() { + this.template.send("kotlinAsyncBatchTestTopic2", "fail") + assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.batchError).isTrue() + } + + @Test + fun `test checkedKh reply`() { + this.template.send("kotlinAsyncTestTopic3", "foo") + val cr = this.template.receive("sendTopic1", 0, 0, Duration.ofSeconds(30)) + assertThat(cr.value()).isEqualTo("FOO") + } + + @KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"], + containerFactory = "kafkaListenerContainerFactory") + class Listener { + + @KafkaHandler + @SendTo("sendTopic1") + suspend fun handler1(value: String) : String { + return value.uppercase() + } + + } + + @Configuration + @EnableKafka + class Config { + + @Volatile + lateinit var received: String + + @Volatile + lateinit var batchReceived: String + + @Volatile + var error: Boolean = false + + @Volatile + var batchError: Boolean = false + + val latch1 = CountDownLatch(1) + + val latch2 = CountDownLatch(1) + + val batchLatch1 = CountDownLatch(1) + + val batchLatch2 = CountDownLatch(1) + + @Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private lateinit var brokerAddresses: String + + @Bean + fun listener() : Listener { + return Listener() + } + + @Bean + fun kpf(): ProducerFactory { + val configs = HashMap() + configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses + configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + return DefaultKafkaProducerFactory(configs) + } + + @Bean + fun kcf(): ConsumerFactory { + val configs = HashMap() + configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses + configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false + configs[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + return DefaultKafkaConsumerFactory(configs) + } + + @Bean + fun kt(): KafkaTemplate { + val kafkaTemplate = KafkaTemplate(kpf()) + kafkaTemplate.setConsumerFactory(kcf()) + return kafkaTemplate + } + + @Bean + fun errorHandler() : KafkaListenerErrorHandler { + return KafkaListenerErrorHandler { message, _ -> + error = true; + latch2.countDown() + message.payload; + } + } + + @Bean + fun errorHandlerBatch() : KafkaListenerErrorHandler { + return KafkaListenerErrorHandler { message, _ -> + batchError = true; + batchLatch2.countDown() + message.payload; + } + } + + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory: ConcurrentKafkaListenerContainerFactory + = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = kcf() + factory.setReplyTemplate(kt()) + return factory + } + + @Bean + fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory: ConcurrentKafkaListenerContainerFactory + = ConcurrentKafkaListenerContainerFactory() + factory.isBatchListener = true + factory.consumerFactory = kcf() + return factory + } + + @KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"], + containerFactory = "kafkaListenerContainerFactory") + suspend fun listen(value: String) { + this.received = value + this.latch1.countDown() + } + + @KafkaListener(id = "kotlin-ex", topics = ["kotlinAsyncTestTopic2"], + containerFactory = "kafkaListenerContainerFactory", errorHandler = "errorHandler") + suspend fun listenEx(value: String) { + if (value == "fail") { + throw Exception("checked") + } + } + + @KafkaListener(id = "kotlin-batch", topics = ["kotlinAsyncBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory") + suspend fun batchListen(values: List>) { + this.batchReceived = values.first().value() + this.batchLatch1.countDown() + } + + @KafkaListener(id = "kotlin-batch-ex", topics = ["kotlinAsyncBatchTestTopic2"], + containerFactory = "kafkaBatchListenerContainerFactory", errorHandler = "errorHandlerBatch") + suspend fun batchListenEx(values: List>) { + if (values.first().value() == "fail") { + throw Exception("checked") + } + } + + } + +} From 1c1cf45a073dd494f1d831e93dc5ca39c72c48f2 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sun, 24 Dec 2023 21:41:57 +0800 Subject: [PATCH 4/8] GH-1189: Auto detect async reply * auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc --- .../receiving-messages/async-returns.adoc | 2 +- .../KafkaMessageListenerContainer.java | 49 ++++--- .../kafka/listener/adapter/AdapterUtils.java | 20 +++ .../adapter/DelegatingInvocableHandler.java | 21 +++ .../listener/adapter/HandlerAdapter.java | 13 ++ .../listener/adapter/HandlerMethodDetect.java | 36 +++++ .../MessagingMessageListenerAdapter.java | 10 +- .../kafka/support/Acknowledgment.java | 2 +- .../kafka/annotation/AsyncListenerTests.java | 138 +++++++++++++++++- 9 files changed, 256 insertions(+), 35 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc index 3421e0eb0d..651b2eb4b7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -23,7 +23,7 @@ public Mono listen(String data) { } ---- -IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes. +IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes. When the async result is completed with an error, whether the message is recover or not depends on the container error handler. If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index d34970fb23..a130bf488f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -104,6 +104,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; +import org.springframework.kafka.listener.adapter.HandlerMethodDetect; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -160,6 +161,7 @@ * @author Francois Rosiere * @author Daniel Gentes * @author Soby Chacko + * @author Wang Zhiyang */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -660,6 +662,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean wantsFullRecords; + private final boolean asyncReplies; + private final boolean autoCommit; private final boolean isManualAck; @@ -850,6 +854,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { + this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies() + || this.containerProperties.isAsyncAcks(); AckMode ackMode = determineAckMode(); this.isManualAck = ackMode.equals(AckMode.MANUAL); this.isCountAck = ackMode.equals(AckMode.COUNT) @@ -860,12 +866,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; this.isRecordAck = ackMode.equals(AckMode.RECORD); this.offsetsInThisBatch = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.deferredOffsets = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.observationRegistry = observationRegistry; @@ -904,8 +910,7 @@ else if (listener instanceof MessageListener) { else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " + "'BatchMessageListener', or the variants that are consumer aware and/or " - + "Acknowledging" - + " not " + listener.getClass().getName()); + + "Acknowledging not " + listener.getClass().getName()); } this.listenerType = listenerType; this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) @@ -928,18 +933,15 @@ else if (listener instanceof MessageListener) { this.logger.info(toString()); } ApplicationContext applicationContext = getApplicationContext(); + ClassLoader classLoader = applicationContext == null + ? getClass().getClassLoader() + : applicationContext.getClassLoader(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, false, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, false, classLoader); this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, true, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, true, classLoader); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property, so we can use it directly from code elsewhere @@ -965,6 +967,9 @@ private AckMode determineAckMode() { if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) { ackMode = AckMode.MANUAL; } + if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) { + ackMode = AckMode.MANUAL; + } return ackMode; } @@ -3389,15 +3394,15 @@ public void acknowledge() { public void nack(Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis(); } @Override - public boolean isAsyncAcks() { - return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; } @Override @@ -3474,8 +3479,8 @@ public void acknowledge(int index) { public void nack(int index, Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds"); ListenerConsumer.this.nackIndex = index; @@ -3499,8 +3504,8 @@ public void nack(int index, Duration sleep) { } @Override - public boolean isAsyncAcks() { - return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index 010ae1a863..e0d7cf556b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -16,6 +16,8 @@ package org.springframework.kafka.listener.adapter; +import java.util.concurrent.CompletableFuture; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -24,6 +26,9 @@ import org.springframework.expression.common.TemplateParserContext; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; + +import reactor.core.publisher.Mono; /** * Utilities for listener adapters. @@ -40,6 +45,9 @@ public final class AdapterUtils { */ public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); + private static final boolean MONO_PRESENT = + ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader()); + private AdapterUtils() { } @@ -85,4 +93,16 @@ public static String getDefaultReplyTopicExpression() { + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); } + static boolean isAsyncReply(Class resultType) { + return isMono(resultType) || isCompletableFuture(resultType); + } + + static boolean isMono(Class resultType) { + return MONO_PRESENT && Mono.class.isAssignableFrom(resultType); + } + + static boolean isCompletableFuture(Class resultType) { + return CompletableFuture.class.isAssignableFrom(resultType); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index d89f83dae4..c826f2f201 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -55,6 +55,7 @@ * unambiguous. * * @author Gary Russell + * @author Wang Zhiyang * */ public class DelegatingInvocableHandler { @@ -86,6 +87,8 @@ public class DelegatingInvocableHandler { private final PayloadValidator validator; + private final boolean asyncReplies; + /** * Construct an instance with the supplied handlers for the bean. * @param handlers the handlers. @@ -116,6 +119,15 @@ public DelegatingInvocableHandler(List handlers, ? configurableListableBeanFactory : null; this.validator = validator == null ? null : new PayloadValidator(validator); + boolean asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler); + for (InvocableHandlerMethod handlerMethod : handlers) { + asyncReplies |= isAsyncReply(handlerMethod); + } + this.asyncReplies = asyncReplies; + } + + private boolean isAsyncReply(InvocableHandlerMethod method) { + return AdapterUtils.isAsyncReply(method.getMethod().getReturnType()); } private void checkSpecial(@Nullable InvocableHandlerMethod handler) { @@ -139,6 +151,15 @@ public Object getBean() { return this.bean; } + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; + } + /** * Invoke the method with the given message. * @param message the message. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 3103b8b221..7809cb2cbe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -33,6 +33,8 @@ public class HandlerAdapter { private final DelegatingInvocableHandler delegatingHandler; + private final boolean asyncReplies; + /** * Construct an instance with the provided method. * @param invokerHandlerMethod the method. @@ -40,6 +42,7 @@ public class HandlerAdapter { public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { this.invokerHandlerMethod = invokerHandlerMethod; this.delegatingHandler = null; + this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType()); } /** @@ -49,6 +52,16 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) { this.invokerHandlerMethod = null; this.delegatingHandler = delegatingHandler; + this.asyncReplies = delegatingHandler.isAsyncReplies(); + } + + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; } public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java new file mode 100644 index 0000000000..0f90ffefdd --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +/** + * Auto-detect {@link HandlerAdapter} args and return type. + * + * @author Wang zhiyang + * @since 3.2 + */ +public interface HandlerMethodDetect { + + /** + * Return true if this listener is request/reply and the replies are async. + * @return true for async replies. + * @since 3.2 + */ + default boolean isAsyncReplies() { + return false; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 2c9bcdf383..552ba51251 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -90,7 +90,7 @@ * @author Nathan Xu * @author Wang ZhiYang */ -public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware { +public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, HandlerMethodDetect { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); @@ -243,6 +243,10 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) { this.handlerMethod = handlerMethod; } + public boolean isAsyncReplies() { + return this.handlerMethod.isAsyncReplies(); + } + protected boolean isConsumerRecordList() { return this.isConsumerRecordList; } @@ -469,7 +473,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle messageReturnType = this.messageReturnType; } if (result instanceof CompletableFuture completable) { - if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + "otherwise the container will ack the message immediately"); } @@ -484,7 +488,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle }); } else if (monoPresent && result instanceof Mono mono) { - if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 63278b4a70..47217d53f1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -81,7 +81,7 @@ default void nack(int index, Duration sleep) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); } - default boolean isAsyncAcks() { + default boolean isOutOfOrderCommit() { return false; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java index e8e1b5a972..24d668737e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -37,6 +37,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -51,7 +52,9 @@ @EmbeddedKafka(topics = { AsyncListenerTests.FUTURE_TOPIC_1, AsyncListenerTests.FUTURE_TOPIC_BATCH_1, AsyncListenerTests.MONO_TOPIC_1, AsyncListenerTests.MONO_TOPIC_BATCH_1, - AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) + AsyncListenerTests.AUTO_DETECT_ASYNC_FUTURE, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_FUTURE, + AsyncListenerTests.AUTO_DETECT_ASYNC_MONO, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_MONO, + AsyncListenerTests.AUTO_DETECT_ASYNC_KAFKA_HANDLER, AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) public class AsyncListenerTests { static final String FUTURE_TOPIC_1 = "future-topic-1"; @@ -64,12 +67,28 @@ public class AsyncListenerTests { static final String SEND_TOPIC_1 = "send-topic-1"; + static final String AUTO_DETECT_ASYNC_FUTURE = "auto-detect-async-future"; + + static final String AUTO_DETECT_ASYNC_BATCH_FUTURE = "auto-detect-async-batch-future"; + + static final String AUTO_DETECT_ASYNC_MONO = "auto-detect-async-mono"; + + static final String AUTO_DETECT_ASYNC_BATCH_MONO = "auto-detect-async-batch-mono"; + + static final String AUTO_DETECT_ASYNC_KAFKA_HANDLER = "auto-detect-async-kafka-handler"; + @Autowired private KafkaTemplate kafkaTemplate; @Autowired private Config config; + @Autowired + private Listener listener; + + @Autowired + MultiMethodListener multiMethodListener; + @Test public void testAsyncListener() throws Exception { @@ -85,7 +104,7 @@ public void testAsyncListener() throws Exception { assertThat(cr1.value()).isEqualTo("1"); kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "bar-2"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 3); - assertThat(cr1.value()).isEqualTo("bar-2_beh"); + assertThat(cr1.value()).isEqualTo("[bar-2]_beh"); kafkaTemplate.send(MONO_TOPIC_1, "foo-3"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 4); @@ -95,14 +114,32 @@ public void testAsyncListener() throws Exception { cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 5); assertThat(cr1.value()).isEqualTo("bar-3_eh"); - kafkaTemplate.send(MONO_TOPIC_BATCH_1, "foo-4"); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 6); assertThat(cr1.value()).isEqualTo("1"); kafkaTemplate.send(MONO_TOPIC_BATCH_1, "bar-4"); assertThat(config.latch2.await(10, TimeUnit.SECONDS)).isEqualTo(true); cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 7); - assertThat(cr1.value()).isEqualTo("bar-4_beh"); + assertThat(cr1.value()).isEqualTo("[bar-4]_beh"); + } + + @Test + public void testAsyncAcks() throws Exception { + + kafkaTemplate.send(AUTO_DETECT_ASYNC_FUTURE, "baz-future"); + assertThat(this.listener.autoDetectFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_FUTURE, "baz-batch-future"); + assertThat(this.listener.autoDetectBatchFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_MONO, "baz-mono"); + assertThat(this.listener.autoDetectMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_MONO, "baz-batch-mono"); + assertThat(this.listener.autoDetectBatchMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_KAFKA_HANDLER, "foo-multi-async"); + assertThat(this.multiMethodListener.handler1.await(10, TimeUnit.SECONDS)).isTrue(); } public static class Listener { @@ -115,7 +152,16 @@ public static class Listener { private final AtomicBoolean monoBatch1 = new AtomicBoolean(true); - @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler") + public final CountDownLatch autoDetectFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectMono = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchMono = new CountDownLatch(1); + + @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") @SendTo(SEND_TOPIC_1) public CompletableFuture listen1(String foo) { CompletableFuture future = new CompletableFuture<>(); @@ -128,7 +174,8 @@ public CompletableFuture listen1(String foo) { return future; } - @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") @SendTo(SEND_TOPIC_1) public CompletableFuture listen2(List foo) { CompletableFuture future = new CompletableFuture<>(); @@ -141,7 +188,8 @@ public CompletableFuture listen2(List foo) { return future; } - @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler") + @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") @SendTo(SEND_TOPIC_1) public Mono listen3(String bar) { if (mono1.getAndSet(false)) { @@ -152,7 +200,8 @@ public Mono listen3(String bar) { } } - @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") @SendTo(SEND_TOPIC_1) public Mono listen4(List bar) { if (monoBatch1.getAndSet(false)) { @@ -163,6 +212,74 @@ public Mono listen4(List bar) { } } + @KafkaListener(id = "autoDetectFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaListenerContainerFactory") + public CompletableFuture listen5(String baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(baz.toUpperCase()); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectBatchFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaBatchListenerContainerFactory") + public CompletableFuture listen6(List baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(String.valueOf(baz.size())); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectMono", topics = AUTO_DETECT_ASYNC_MONO, + containerFactory = "kafkaListenerContainerFactory") + public Mono listen7(String qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectMono.countDown(); + } + return Mono.just(qux.toUpperCase()); + } + + @KafkaListener(id = "autoDetectBatchMono", topics = AUTO_DETECT_ASYNC_BATCH_MONO, + containerFactory = "kafkaBatchListenerContainerFactory") + public Mono listen8(List qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchMono.countDown(); + } + return Mono.just(String.valueOf(qux.size())); + } + + } + + @KafkaListener(id = "autoDetectKafkaHandler", topics = AUTO_DETECT_ASYNC_KAFKA_HANDLER, + containerFactory = "kafkaListenerContainerFactory") + public static class MultiMethodListener { + + public final CountDownLatch handler1 = new CountDownLatch(1); + + @KafkaHandler + public Mono handler1(String foo, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + handler1.countDown(); + } + return Mono.just(foo); + } + + @KafkaHandler + public CompletableFuture handler2(Integer bar) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(bar); + return future; + } + + @KafkaHandler(isDefault = true) + public void handler2(Short baz, Acknowledgment acknowledgment) { + acknowledgment.acknowledge(); + } + } @Configuration @@ -181,6 +298,11 @@ public Listener listener() { return new Listener(); } + @Bean + public MultiMethodListener multiMethodListener() { + return new MultiMethodListener(); + } + @Bean public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { KafkaTemplate template = new KafkaTemplate<>(producerFactory(embeddedKafka)); From c93e1a128a5acfd523c91cab0801b668635a8fd0 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sun, 14 Jan 2024 21:48:29 +0800 Subject: [PATCH 5/8] GH-1189: `@SendTo` for `@KafkaHandler` after error is handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost. --- .../adapter/DelegatingInvocableHandler.java | 13 ++++++++- .../listener/adapter/HandlerAdapter.java | 12 +++++++- .../MessagingMessageListenerAdapter.java | 29 ++++++++++--------- .../EnableKafkaIntegrationTests.java | 8 +++-- .../EnableKafkaKotlinCoroutinesTests.kt | 8 ++--- 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index c826f2f201..5d85dee5e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -337,6 +337,17 @@ private boolean assignPayload(MethodParameter methodParameter, Class payloadC && methodParameter.getParameterType().isAssignableFrom(payloadClass); } + @Nullable + public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) { + + InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass()); + if (handler != null) { + return new InvocationResult(result, this.handlerSendTo.get(handler), + this.handlerReturnsMessage.get(handler)); + } + return null; + } + private static final class PayloadValidator extends PayloadMethodArgumentResolver { PayloadValidator(Validator validator) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 7809cb2cbe..0aac47f788 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2021 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.listener.adapter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -98,4 +99,13 @@ public Object getBean() { } } + @Nullable + public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) { + + if (this.delegatingHandler != null && inboundPayload != null) { + return this.delegatingHandler.getInvocationResultFor(result, inboundPayload); + } + return null; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 552ba51251..9b143ef277 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -462,16 +463,14 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle String replyTopic = evaluateReplyTopic(request, source, resultArg); Assert.state(replyTopic == null || this.replyTemplate != null, "a KafkaTemplate is required to support replies"); - Object result; - boolean messageReturnType; - if (resultArg instanceof InvocationResult invocationResult) { - result = invocationResult.getResult(); - messageReturnType = invocationResult.isMessageReturnType(); - } - else { - result = resultArg; - messageReturnType = this.messageReturnType; - } + + Object result = resultArg instanceof InvocationResult invocationResult ? + invocationResult.getResult() : + resultArg; + boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ? + invocationResult.isMessageReturnType() : + this.messageReturnType; + if (result instanceof CompletableFuture completable) { if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " @@ -677,9 +676,11 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle if (NULL_MESSAGE.equals(message)) { message = new GenericMessage<>(records); } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, records, acknowledgment, consumer, message); + Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment); + if (errorResult != null && !(errorResult instanceof InvocationResult)) { + Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload()); + handleResult(Objects.requireNonNullElse(result, errorResult), + records, acknowledgment, consumer, message); } } catch (Exception ex) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index dd757944f6..27808beea3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -470,10 +470,12 @@ public void testMulti() throws Exception { assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); ConsumerRecord reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); assertThat(reply.value()).isEqualTo("OK"); - consumer.close(); template.send("annotated8", 0, 1, "junk"); assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue(); + ConsumerRecord reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); + consumer.close(); + assertThat(reply2.value()).isEqualTo("JUNK intentional"); assertThat(this.multiListener.meta).isNotNull(); } @@ -1754,7 +1756,8 @@ public Object resolveArgument(MethodParameter parameter, Message message) { public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) { return (m, e) -> { listener.errorLatch.countDown(); - return null; + String payload = m.getPayload().toString().toUpperCase(); + return payload + " " + e.getCause().getMessage(); }; } @@ -2468,6 +2471,7 @@ static class MultiListenerBean { volatile ConsumerRecordMetadata meta; @KafkaHandler + @SendTo("annotated8reply") public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) { if ("junk".equals(bar)) { throw new RuntimeException("intentional"); diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index a312d374cc..33bc106f44 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", - "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2"]) + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"]) class EnableKafkaKotlinCoroutinesTests { @Autowired @@ -96,7 +96,7 @@ class EnableKafkaKotlinCoroutinesTests { @Test fun `test checkedKh reply`() { this.template.send("kotlinAsyncTestTopic3", "foo") - val cr = this.template.receive("sendTopic1", 0, 0, Duration.ofSeconds(30)) + val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30)) assertThat(cr.value()).isEqualTo("FOO") } @@ -105,7 +105,7 @@ class EnableKafkaKotlinCoroutinesTests { class Listener { @KafkaHandler - @SendTo("sendTopic1") + @SendTo("sendTopicReply1") suspend fun handler1(value: String) : String { return value.uppercase() } From 6a9757fbccae4fcd6cc3361930ab49097a5a2c80 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Fri, 19 Jan 2024 11:21:19 +0800 Subject: [PATCH 6/8] * add javadoc in `AdapterUtils` * move class from package `annotation` to package `adapter` * re name bar,baz in BatchMessagingMessageListenerAdapterTests * poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests` * poblish doc async-returns.adoc and nav.adoc --- .../src/main/antora/modules/ROOT/nav.adoc | 1 + .../receiving-messages/async-returns.adoc | 3 +- ...kaListenerAnnotationBeanPostProcessor.java | 3 +- .../kafka/listener/adapter/AdapterUtils.java | 27 ++++++++++-- ...inuationHandlerMethodArgumentResolver.java | 6 +-- .../KafkaMessageHandlerMethodFactory.java | 6 +-- ...KafkaNullAwarePayloadArgumentResolver.java | 4 +- .../KotlinAwareInvocableHandlerMethod.java | 6 +-- ...hMessagingMessageListenerAdapterTests.java | 42 ++++++++++--------- .../MessagingMessageListenerAdapterTests.java | 6 +-- .../EnableKafkaKotlinCoroutinesTests.kt | 8 ++-- 11 files changed, 67 insertions(+), 45 deletions(-) rename spring-kafka/src/main/java/org/springframework/kafka/{annotation => listener/adapter}/ContinuationHandlerMethodArgumentResolver.java (93%) rename spring-kafka/src/main/java/org/springframework/kafka/{annotation => listener/adapter}/KafkaMessageHandlerMethodFactory.java (95%) rename spring-kafka/src/main/java/org/springframework/kafka/{annotation => listener/adapter}/KafkaNullAwarePayloadArgumentResolver.java (95%) rename spring-kafka/src/main/java/org/springframework/kafka/{annotation => listener/adapter}/KotlinAwareInvocableHandlerMethod.java (91%) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc index b1d8125c21..16f41dab0b 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc @@ -11,6 +11,7 @@ **** xref:kafka/receiving-messages/message-listeners.adoc[] **** xref:kafka/receiving-messages/message-listener-container.adoc[] **** xref:kafka/receiving-messages/ooo-commits.adoc[] +**** xref:kafka/receiving-messages/async-returns.adoc[] **** xref:kafka/receiving-messages/listener-annotation.adoc[] **** xref:kafka/receiving-messages/listener-group-id.adoc[] **** xref:kafka/receiving-messages/container-thread-naming.adoc[] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc index 651b2eb4b7..78f3818d4e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -1,7 +1,8 @@ [[async-returns]] = Asynchronous `@KafkaListener` Return Types -`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture` and `Mono`, letting the reply be sent asynchronously. +`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously. +return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions [source, java] ---- diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 8862220862..fcab2fb01f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,7 @@ import org.springframework.kafka.listener.ContainerGroupSequencer; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.retrytopic.DestinationTopicResolver; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index e0d7cf556b..bd83b9664f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ * Utilities for listener adapters. * * @author Gary Russell + * @author Wang Zhiyang * @since 2.5 * */ @@ -93,15 +94,33 @@ public static String getDefaultReplyTopicExpression() { + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); } - static boolean isAsyncReply(Class resultType) { + /** + * Return the true when return types are asynchronous. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code Mono} or {@code CompletableFuture}. + * @since 3.2 + */ + public static boolean isAsyncReply(Class resultType) { return isMono(resultType) || isCompletableFuture(resultType); } - static boolean isMono(Class resultType) { + /** + * Return the true when type is {@code Mono}. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code Mono}. + * @since 3.2 + */ + public static boolean isMono(Class resultType) { return MONO_PRESENT && Mono.class.isAssignableFrom(resultType); } - static boolean isCompletableFuture(Class resultType) { + /** + * Return the true when type is {@code CompletableFuture}. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code CompletableFuture}. + * @since 3.2 + */ + public static boolean isCompletableFuture(Class resultType) { return CompletableFuture.class.isAssignableFrom(resultType); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java similarity index 93% rename from spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java index 1622159a7d..0d5378e317 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/ContinuationHandlerMethodArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.annotation; +package org.springframework.kafka.listener.adapter; /** * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. @@ -25,7 +25,7 @@ * * @author Wang Zhiyang * - * @since 3.1 + * @since 3.2 * * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java similarity index 95% rename from spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java index 794e1f9705..9e0a49dcad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.annotation; +package org.springframework.kafka.listener.adapter; import java.lang.reflect.Method; import java.util.List; @@ -32,7 +32,7 @@ * * @author Wang Zhiyang * - * @since 3.1 + * @since 3.2 */ public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java similarity index 95% rename from spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java index bfa45ea6a6..c269aa024e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.annotation; +package org.springframework.kafka.listener.adapter; import java.util.List; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java similarity index 91% rename from spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java index b5d26cd737..88451b0fb3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.annotation; +package org.springframework.kafka.listener.adapter; import java.lang.reflect.Method; @@ -27,7 +27,7 @@ * * @author Wang Zhiyang * - * @since 3.1 + * @since 3.2 */ public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index 2ece2495b5..40fef84669 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ /** * @author Gary Russell + * @author Wang Zhiyang * @since 2.2.5 * */ @@ -71,35 +72,36 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr @SuppressWarnings("unchecked") @Test - public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Bar bar) { + public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, + @Autowired BatchFuture batchFuture) { BatchMessagingMessageListenerAdapter adapter = spy((BatchMessagingMessageListenerAdapter) registry - .getListenerContainer("bar").getContainerProperties().getMessageListener()); + .getListenerContainer("batchFuture").getContainerProperties().getMessageListener()); KafkaUtils.setConsumerGroupId("test.group.future"); List> list = new ArrayList<>(); - list.add(new ConsumerRecord<>("bar", 0, 0L, null, "future_1")); - list.add(new ConsumerRecord<>("bar", 0, 1L, null, "future_2")); - list.add(new ConsumerRecord<>("bar", 1, 0L, null, "future_3")); + list.add(new ConsumerRecord<>("batchFuture", 0, 0L, null, "future_1")); + list.add(new ConsumerRecord<>("batchFuture", 0, 1L, null, "future_2")); + list.add(new ConsumerRecord<>("batchFuture", 1, 0L, null, "future_3")); adapter.onMessage(list, null, null); - assertThat(bar.group).isEqualTo("test.group.future"); + assertThat(batchFuture.group).isEqualTo("test.group.future"); verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); verify(adapter, times(1)).acknowledge(any()); } @SuppressWarnings("unchecked") @Test - public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Baz baz) { + public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired BatchMono batchMono) { BatchMessagingMessageListenerAdapter adapter = spy((BatchMessagingMessageListenerAdapter) registry - .getListenerContainer("baz").getContainerProperties().getMessageListener()); + .getListenerContainer("batchMono").getContainerProperties().getMessageListener()); KafkaUtils.setConsumerGroupId("test.group.mono"); List> list = new ArrayList<>(); - list.add(new ConsumerRecord<>("baz", 0, 0L, null, "mono_1")); - list.add(new ConsumerRecord<>("baz", 0, 1L, null, "mono_2")); + list.add(new ConsumerRecord<>("batchMono", 0, 0L, null, "mono_1")); + list.add(new ConsumerRecord<>("batchMono", 0, 1L, null, "mono_2")); adapter.onMessage(list, null, null); - assertThat(baz.group).isEqualTo("test.group.mono"); + assertThat(batchMono.group).isEqualTo("test.group.mono"); verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); verify(adapter, times(1)).acknowledge(any()); } @@ -118,11 +120,11 @@ public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String grou } - public static class Bar { + public static class BatchFuture { public volatile String group; - @KafkaListener(id = "bar", topics = "bar", autoStartup = "false") + @KafkaListener(id = "batchFuture", topics = "batchFuture", autoStartup = "false") public CompletableFuture listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { this.group = groupId; @@ -133,13 +135,13 @@ public CompletableFuture listen(List list, @Header(KafkaHeaders. } - public static class Baz { + public static class BatchMono { public volatile String value = "someValue"; public volatile String group; - @KafkaListener(id = "baz", topics = "baz", autoStartup = "false") + @KafkaListener(id = "batchMono", topics = "batchMono", autoStartup = "false") public Mono listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { this.group = groupId; @@ -158,13 +160,13 @@ public Foo foo() { } @Bean - public Bar bar() { - return new Bar(); + public BatchFuture batchFuture() { + return new BatchFuture(); } @Bean - public Baz baz() { - return new Baz(); + public BatchMono batchMono() { + return new BatchMono(); } @SuppressWarnings({ "rawtypes" }) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index d8ed544380..800c399cba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,9 +124,7 @@ public void test(Acknowledgment ack) { public CompletableFuture future(String data, Acknowledgment ack) { - CompletableFuture future = new CompletableFuture<>(); - future.complete("processed" + data); - return future; + return CompletableFuture.completedFuture("processed" + data); } public Mono mono(String data, Acknowledgment ack) { diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index 33bc106f44..573592c559 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", - "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"]) + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"]) class EnableKafkaKotlinCoroutinesTests { @Autowired @@ -96,8 +96,8 @@ class EnableKafkaKotlinCoroutinesTests { @Test fun `test checkedKh reply`() { this.template.send("kotlinAsyncTestTopic3", "foo") - val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30)) - assertThat(cr.value()).isEqualTo("FOO") + val cr = this.template.receive("kotlinReplyTopic1", 0, 0, Duration.ofSeconds(30)) + assertThat(cr?.value() ?: "null").isEqualTo("FOO") } @KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"], @@ -105,7 +105,7 @@ class EnableKafkaKotlinCoroutinesTests { class Listener { @KafkaHandler - @SendTo("sendTopicReply1") + @SendTo("kotlinReplyTopic1") suspend fun handler1(value: String) : String { return value.uppercase() } From bf8bb23aae8bd741c5b6852996b36911dc27d8cb Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Fri, 19 Jan 2024 23:56:50 +0800 Subject: [PATCH 7/8] review fix * rename `HandlerMethodDetect` to `AsyncRepliesAware` * fix javadoc in `ContinuationHandlerMethodArgumentResolver` --- .../listener/KafkaMessageListenerContainer.java | 4 ++-- ...lerMethodDetect.java => AsyncRepliesAware.java} | 4 ++-- .../ContinuationHandlerMethodArgumentResolver.java | 14 ++++++-------- .../adapter/MessagingMessageListenerAdapter.java | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) rename spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/{HandlerMethodDetect.java => AsyncRepliesAware.java} (90%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index a130bf488f..6d2b0ffc49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -104,7 +104,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; -import org.springframework.kafka.listener.adapter.HandlerMethodDetect; +import org.springframework.kafka.listener.adapter.AsyncRepliesAware; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -854,7 +854,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { - this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies() + this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies() || this.containerProperties.isAsyncAcks(); AckMode ackMode = determineAckMode(); this.isManualAck = ackMode.equals(AckMode.MANUAL); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java similarity index 90% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java index 0f90ffefdd..43ea7fda3c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerMethodDetect.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java @@ -17,12 +17,12 @@ package org.springframework.kafka.listener.adapter; /** - * Auto-detect {@link HandlerAdapter} args and return type. + * Auto-detect {@link HandlerAdapter} return type. * * @author Wang zhiyang * @since 3.2 */ -public interface HandlerMethodDetect { +public interface AsyncRepliesAware { /** * Return true if this listener is request/reply and the replies are async. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java index 0d5378e317..d870ea5264 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -16,6 +16,12 @@ package org.springframework.kafka.listener.adapter; +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; + +import reactor.core.publisher.Mono; + /** * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. *

@@ -24,17 +30,9 @@ * but for regular {@link HandlerMethodArgumentResolver} contract. * * @author Wang Zhiyang - * * @since 3.2 - * * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver */ -import org.springframework.core.MethodParameter; -import org.springframework.messaging.Message; -import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; - -import reactor.core.publisher.Mono; - public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9b143ef277..f1d51c6771 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -91,7 +91,7 @@ * @author Nathan Xu * @author Wang ZhiYang */ -public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, HandlerMethodDetect { +public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); From 0af8e9fba582d1dc9f2332baea02fafc9e8e4980 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Sat, 20 Jan 2024 02:54:06 +0800 Subject: [PATCH 8/8] review fix After kafka client 2.4 producer uses sticky partition, its randomly chose partition and topic default partitions is 2, configure that @EmbeddedKafka to provide just one partition per topic. * javadoc in `AsyncRepliesAware` * fix test in EnableKafkaKotlinCoroutinesTests * polish adoc --- .../ROOT/pages/kafka/receiving-messages/async-returns.adoc | 2 +- .../src/main/antora/modules/ROOT/pages/whats-new.adoc | 2 +- .../kafka/listener/adapter/AsyncRepliesAware.java | 7 ++++--- .../adapter/ContinuationHandlerMethodArgumentResolver.java | 2 ++ .../kafka/listener/EnableKafkaKotlinCoroutinesTests.kt | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc index 78f3818d4e..451a93dadb 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -2,7 +2,7 @@ = Asynchronous `@KafkaListener` Return Types `@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously. -return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions +return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. [source, java] ---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index bb43911f48..77198a4bc9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -17,5 +17,5 @@ See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionId [[x32-async-return]] === Async @KafkaListener Return -`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture` and `Mono`. +`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java index 43ea7fda3c..a177c074fd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,15 +17,16 @@ package org.springframework.kafka.listener.adapter; /** - * Auto-detect {@link HandlerAdapter} return type. + * Message handler adapter implementing this interface can detect {@link HandlerAdapter} async return types. * * @author Wang zhiyang + * * @since 3.2 */ public interface AsyncRepliesAware { /** - * Return true if this listener is request/reply and the replies are async. + * Return true if the {@link HandlerAdapter} return type is async. * @return true for async replies. * @since 3.2 */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java index d870ea5264..f475f72a58 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -30,7 +30,9 @@ * but for regular {@link HandlerMethodArgumentResolver} contract. * * @author Wang Zhiyang + * * @since 3.2 + * * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver */ public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index 573592c559..aaf1f079d3 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", - "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"]) + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1) class EnableKafkaKotlinCoroutinesTests { @Autowired