diff --git a/build.gradle b/build.gradle index 8a72fafdd2..64f9c196d0 100644 --- a/build.gradle +++ b/build.gradle @@ -60,6 +60,7 @@ ext { junit4Version = '4.13.2' junitJupiterVersion = '5.10.1' kafkaVersion = '3.6.1' + kotlinCoroutinesVersion = '1.7.3' log4jVersion = '2.22.1' micrometerDocsVersion = '1.0.2' micrometerVersion = '1.13.0-SNAPSHOT' @@ -278,6 +279,7 @@ project ('spring-kafka') { } api "org.apache.kafka:kafka-clients:$kafkaVersion" optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion" + optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion" optionalApi 'com.fasterxml.jackson.core:jackson-core' optionalApi 'com.fasterxml.jackson.core:jackson-databind' optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc index b1d8125c21..16f41dab0b 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc @@ -11,6 +11,7 @@ **** xref:kafka/receiving-messages/message-listeners.adoc[] **** xref:kafka/receiving-messages/message-listener-container.adoc[] **** xref:kafka/receiving-messages/ooo-commits.adoc[] +**** xref:kafka/receiving-messages/async-returns.adoc[] **** xref:kafka/receiving-messages/listener-annotation.adoc[] **** xref:kafka/receiving-messages/listener-group-id.adoc[] **** xref:kafka/receiving-messages/container-thread-naming.adoc[] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc new file mode 100644 index 0000000000..c54f5920a1 --- /dev/null +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -0,0 +1,32 @@ +[[async-returns]] += Asynchronous `@KafkaListener` Return Types + +Starting with version 3.2, `@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously. +return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public CompletableFuture listen(String data) { + ... + CompletableFuture future = new CompletableFuture<>(); + future.complete("done"); + return future; +} +---- + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public Mono listen(String data) { + ... + return Mono.empty(); +} +---- + +IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes. +When the async result is completed with an error, whether the message is recover or not depends on the container error handler. +If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. + +If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure. +See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 3601aca797..77198a4bc9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix. The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. -See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. \ No newline at end of file +See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. + +[[x32-async-return]] +=== Async @KafkaListener Return + +`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. +See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index d7e68a7709..fcab2fb01f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,7 @@ import org.springframework.kafka.listener.ContainerGroupSequencer; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.retrytopic.DestinationTopicResolver; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; @@ -1155,7 +1156,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() { } private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { - DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); + DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory(); Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator(); if (validator != null) { defaultFactory.setValidator(validator); @@ -1170,8 +1171,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { List customArgumentsResolver = new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers()); - // Has to be at the end - look at PayloadMethodArgumentResolver documentation - customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator)); defaultFactory.setCustomArgumentResolvers(customArgumentsResolver); defaultFactory.afterPropertiesSet(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 28e190674e..5e27af27cd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -147,8 +147,8 @@ private String getReplyTopic() { } String topic = destinations.length == 1 ? destinations[0] : ""; BeanFactory beanFactory = getBeanFactory(); - if (beanFactory instanceof ConfigurableListableBeanFactory) { - topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic); + if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) { + topic = configurableListableBeanFactory.resolveEmbeddedValue(topic); if (topic != null) { topic = resolve(topic); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java index 91715b1710..36536a9fcb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ default Object handleError(Message message, ListenerExecutionFailedException * @return the return value is ignored unless the annotated method has a * {@code @SendTo} annotation. */ + @Nullable default Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer, @Nullable Acknowledgment ack) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bd0523c1c5..6d2b0ffc49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -104,6 +104,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; +import org.springframework.kafka.listener.adapter.AsyncRepliesAware; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -160,6 +161,7 @@ * @author Francois Rosiere * @author Daniel Gentes * @author Soby Chacko + * @author Wang Zhiyang */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -660,6 +662,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean wantsFullRecords; + private final boolean asyncReplies; + private final boolean autoCommit; private final boolean isManualAck; @@ -850,6 +854,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { + this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies() + || this.containerProperties.isAsyncAcks(); AckMode ackMode = determineAckMode(); this.isManualAck = ackMode.equals(AckMode.MANUAL); this.isCountAck = ackMode.equals(AckMode.COUNT) @@ -860,12 +866,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; this.isRecordAck = ackMode.equals(AckMode.RECORD); this.offsetsInThisBatch = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.deferredOffsets = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() + this.isAnyManualAck && this.asyncReplies + ? new ConcurrentHashMap<>() : null; this.observationRegistry = observationRegistry; @@ -904,8 +910,7 @@ else if (listener instanceof MessageListener) { else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " + "'BatchMessageListener', or the variants that are consumer aware and/or " - + "Acknowledging" - + " not " + listener.getClass().getName()); + + "Acknowledging not " + listener.getClass().getName()); } this.listenerType = listenerType; this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) @@ -928,18 +933,15 @@ else if (listener instanceof MessageListener) { this.logger.info(toString()); } ApplicationContext applicationContext = getApplicationContext(); + ClassLoader classLoader = applicationContext == null + ? getClass().getClassLoader() + : applicationContext.getClassLoader(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, false, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, false, classLoader); this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, - consumerProperties, true, - applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader()); + consumerProperties, true, classLoader); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property, so we can use it directly from code elsewhere @@ -965,6 +967,9 @@ private AckMode determineAckMode() { if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) { ackMode = AckMode.MANUAL; } + if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) { + ackMode = AckMode.MANUAL; + } return ackMode; } @@ -3389,12 +3394,17 @@ public void acknowledge() { public void nack(Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis(); } + @Override + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; + } + @Override public String toString() { return "Acknowledgment for " + KafkaUtils.format(this.cRecord); @@ -3469,8 +3479,8 @@ public void acknowledge(int index) { public void nack(int index, Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "nack() is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(!ListenerConsumer.this.asyncReplies, + "nack() is not supported with out-of-order commits"); Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative"); Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds"); ListenerConsumer.this.nackIndex = index; @@ -3493,6 +3503,11 @@ public void nack(int index, Duration sleep) { processAcks(new ConsumerRecords(newRecords)); } + @Override + public boolean isOutOfOrderCommit() { + return ListenerConsumer.this.asyncReplies; + } + @Override public String toString() { return "Acknowledgment for " + this.records; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index 010ae1a863..bd83b9664f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.kafka.listener.adapter; +import java.util.concurrent.CompletableFuture; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -24,11 +26,15 @@ import org.springframework.expression.common.TemplateParserContext; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; + +import reactor.core.publisher.Mono; /** * Utilities for listener adapters. * * @author Gary Russell + * @author Wang Zhiyang * @since 2.5 * */ @@ -40,6 +46,9 @@ public final class AdapterUtils { */ public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}"); + private static final boolean MONO_PRESENT = + ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader()); + private AdapterUtils() { } @@ -85,4 +94,34 @@ public static String getDefaultReplyTopicExpression() { + KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix(); } + /** + * Return the true when return types are asynchronous. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code Mono} or {@code CompletableFuture}. + * @since 3.2 + */ + public static boolean isAsyncReply(Class resultType) { + return isMono(resultType) || isCompletableFuture(resultType); + } + + /** + * Return the true when type is {@code Mono}. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code Mono}. + * @since 3.2 + */ + public static boolean isMono(Class resultType) { + return MONO_PRESENT && Mono.class.isAssignableFrom(resultType); + } + + /** + * Return the true when type is {@code CompletableFuture}. + * @param resultType {@code InvocableHandlerMethod} return type. + * @return type is {@code CompletableFuture}. + * @since 3.2 + */ + public static boolean isCompletableFuture(Class resultType) { + return CompletableFuture.class.isAssignableFrom(resultType); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java new file mode 100644 index 0000000000..a177c074fd --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AsyncRepliesAware.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +/** + * Message handler adapter implementing this interface can detect {@link HandlerAdapter} async return types. + * + * @author Wang zhiyang + * + * @since 3.2 + */ +public interface AsyncRepliesAware { + + /** + * Return true if the {@link HandlerAdapter} return type is async. + * @return true for async replies. + * @since 3.2 + */ + default boolean isAsyncReplies() { + return false; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index a2c1d87704..b7241c0fd3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -26,14 +26,12 @@ import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.KafkaListenerErrorHandler; -import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -56,6 +54,7 @@ * @author Gary Russell * @author Artem Bilan * @author Venil Noronha + * @author Wang ZhiYang * @since 1.1 */ public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter @@ -63,8 +62,6 @@ public class BatchMessagingMessageListenerAdapter extends MessagingMessage private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); - private final KafkaListenerErrorHandler errorHandler; - private BatchToRecordAdapter batchToRecordAdapter; /** @@ -85,8 +82,7 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method) { public BatchMessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { - super(bean, method); - this.errorHandler = errorHandler; + super(bean, method, errorHandler); } /** @@ -172,39 +168,6 @@ public void onMessage(List> records, @Nullable Acknowledgme invoke(records, acknowledgment, consumer, message); } - protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, - final Message messageArg) { - - Message message = messageArg; - try { - Object result = invokeHandler(records, acknowledgment, message, consumer); - if (result != null) { - handleResult(result, records, message); - } - } - catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control - if (this.errorHandler != null) { - try { - if (message.equals(NULL_MESSAGE)) { - message = new GenericMessage<>(records); - } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, records, message); - } - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss - "Listener error handler threw an exception for the incoming message", - message.getPayload()), ex); - } - } - else { - throw e; - } - } - } - @SuppressWarnings({ "unchecked", "rawtypes" }) protected Message toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment, Consumer consumer) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java new file mode 100644 index 0000000000..f475f72a58 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; + +import reactor.core.publisher.Mono; + +/** + * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. + *

+ * This class is similar to + * {@link org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver} + * but for regular {@link HandlerMethodArgumentResolver} contract. + * + * @author Wang Zhiyang + * + * @since 3.2 + * + * @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver + */ +public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName()); + } + + @Override + public Object resolveArgument(MethodParameter parameter, Message message) { + return Mono.empty(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index d89f83dae4..9f94c33d54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * unambiguous. * * @author Gary Russell + * @author Wang Zhiyang * */ public class DelegatingInvocableHandler { @@ -86,6 +87,8 @@ public class DelegatingInvocableHandler { private final PayloadValidator validator; + private final boolean asyncReplies; + /** * Construct an instance with the supplied handlers for the bean. * @param handlers the handlers. @@ -116,6 +119,15 @@ public DelegatingInvocableHandler(List handlers, ? configurableListableBeanFactory : null; this.validator = validator == null ? null : new PayloadValidator(validator); + boolean asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler); + for (InvocableHandlerMethod handlerMethod : handlers) { + asyncReplies |= isAsyncReply(handlerMethod); + } + this.asyncReplies = asyncReplies; + } + + private boolean isAsyncReply(InvocableHandlerMethod method) { + return AdapterUtils.isAsyncReply(method.getMethod().getReturnType()); } private void checkSpecial(@Nullable InvocableHandlerMethod handler) { @@ -139,6 +151,15 @@ public Object getBean() { return this.bean; } + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; + } + /** * Invoke the method with the given message. * @param message the message. @@ -316,6 +337,23 @@ private boolean assignPayload(MethodParameter methodParameter, Class payloadC && methodParameter.getParameterType().isAssignableFrom(payloadClass); } + /** + * Return the result of a method invocation by providing a result and payload. + * @param result the result. + * @param inboundPayload the payload. + * @return the result of a method invocation. + * @since 3.2 + */ + @Nullable + public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) { + InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass()); + if (handler != null) { + return new InvocationResult(result, this.handlerSendTo.get(handler), + this.handlerReturnsMessage.get(handler)); + } + return null; + } + private static final class PayloadValidator extends PayloadMethodArgumentResolver { PayloadValidator(Validator validator) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 3103b8b221..82caa738fd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2021 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.listener.adapter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -33,6 +34,8 @@ public class HandlerAdapter { private final DelegatingInvocableHandler delegatingHandler; + private final boolean asyncReplies; + /** * Construct an instance with the provided method. * @param invokerHandlerMethod the method. @@ -40,6 +43,7 @@ public class HandlerAdapter { public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { this.invokerHandlerMethod = invokerHandlerMethod; this.delegatingHandler = null; + this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType()); } /** @@ -49,6 +53,16 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) { this.invokerHandlerMethod = null; this.delegatingHandler = delegatingHandler; + this.asyncReplies = delegatingHandler.isAsyncReplies(); + } + + /** + * Return true if any handler method has an async reply type. + * @return the asyncReply. + * @since 3.2 + */ + public boolean isAsyncReplies() { + return this.asyncReplies; } public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR @@ -85,4 +99,12 @@ public Object getBean() { } } + @Nullable + public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) { + if (this.delegatingHandler != null && inboundPayload != null) { + return this.delegatingHandler.getInvocationResultFor(result, inboundPayload); + } + return null; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java index 8c31633ea5..877315717a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java @@ -21,39 +21,14 @@ /** * The result of a method invocation. + * @param result the result. + * @param messageReturnType the message return type. + * @param sendTo the expression about sends topic. * * @author Gary Russell * @since 2.2 */ -public final class InvocationResult { - - @Nullable - private final Object result; - - @Nullable - private final Expression sendTo; - - private final boolean messageReturnType; - - public InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) { - this.result = result; - this.sendTo = sendTo; - this.messageReturnType = messageReturnType; - } - - @Nullable - public Object getResult() { - return this.result; - } - - @Nullable - public Expression getSendTo() { - return this.sendTo; - } - - public boolean isMessageReturnType() { - return this.messageReturnType; - } +public record InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) { @Override public String toString() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java new file mode 100644 index 0000000000..9e0a49dcad --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import java.lang.reflect.Method; +import java.util.List; + +import org.springframework.core.KotlinDetector; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; +import org.springframework.validation.Validator; + +/** + * Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Kafka requirements. + * + * @author Wang Zhiyang + * + * @since 3.2 + */ +public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory { + + private final HandlerMethodArgumentResolverComposite argumentResolvers = + new HandlerMethodArgumentResolverComposite(); + + private MessageConverter messageConverter; + + private Validator validator; + + @Override + public void setMessageConverter(MessageConverter messageConverter) { + super.setMessageConverter(messageConverter); + this.messageConverter = messageConverter; + } + + @Override + public void setValidator(Validator validator) { + super.setValidator(validator); + this.validator = validator; + } + + @Override + protected List initArgumentResolvers() { + List resolvers = super.initArgumentResolvers(); + if (KotlinDetector.isKotlinPresent()) { + // Insert before PayloadMethodArgumentResolver + resolvers.add(resolvers.size() - 1, new ContinuationHandlerMethodArgumentResolver()); + } + // Has to be at the end - look at PayloadMethodArgumentResolver documentation + resolvers.add(resolvers.size() - 1, new KafkaNullAwarePayloadArgumentResolver(this.messageConverter, this.validator)); + this.argumentResolvers.addResolvers(resolvers); + return resolvers; + } + + @Override + public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { + InvocableHandlerMethod handlerMethod = new KotlinAwareInvocableHandlerMethod(bean, method); + handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); + return handlerMethod; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java similarity index 95% rename from spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java rename to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java index bfa45ea6a6..c269aa024e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaNullAwarePayloadArgumentResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.annotation; +package org.springframework.kafka.listener.adapter; import java.util.List; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java new file mode 100644 index 0000000000..88451b0fb3 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import java.lang.reflect.Method; + +import org.springframework.core.CoroutinesUtils; +import org.springframework.core.KotlinDetector; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +/** + * An {@link InvocableHandlerMethod} extension for supporting Kotlin {@code suspend} function. + * + * @author Wang Zhiyang + * + * @since 3.2 + */ +public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod { + + public KotlinAwareInvocableHandlerMethod(Object bean, Method method) { + super(bean, method); + } + + @Override + protected Object doInvoke(Object... args) throws Exception { + Method method = getBridgedMethod(); + if (KotlinDetector.isSuspendingFunction(method)) { + return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args); + } + else { + return super.doInvoke(args); + } + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 0b9b7097fc..370b8167a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; @@ -35,6 +37,7 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.context.expression.MapAccessor; +import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; @@ -45,7 +48,9 @@ import org.springframework.expression.spel.support.StandardTypeConverter; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerSeekAware; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; @@ -65,9 +70,12 @@ import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + /** * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter * providing the necessary infrastructure to extract the payload of a @@ -83,7 +91,7 @@ * @author Nathan Xu * @author Wang ZhiYang */ -public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware { +public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); @@ -94,6 +102,9 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS */ protected static final Message NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR + private static final boolean monoPresent = + ClassUtils.isPresent("reactor.core.publisher.Mono", MessageListener.class.getClassLoader()); + private final Object bean; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR @@ -102,6 +113,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); + private final KafkaListenerErrorHandler errorHandler; + private HandlerAdapter handlerMethod; private boolean isConsumerRecordList; @@ -143,8 +156,19 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS * @param method the method. */ protected MessagingMessageListenerAdapter(Object bean, Method method) { + this(bean, method, null); + } + + /** + * Create an instance with the provided bean, method and kafka listener error handler. + * @param bean the bean. + * @param method the method. + * @param errorHandler the kafka listener error handler. + */ + protected MessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { this.bean = bean; this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final + this.errorHandler = errorHandler; } /** @@ -220,6 +244,10 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) { this.handlerMethod = handlerMethod; } + public boolean isAsyncReplies() { + return this.handlerMethod.isAsyncReplies(); + } + protected boolean isConsumerRecordList() { return this.isConsumerRecordList; } @@ -348,6 +376,20 @@ protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } + protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + final Message message) { + + try { + Object result = invokeHandler(records, acknowledgment, message, consumer); + if (result != null) { + handleResult(result, records, acknowledgment, consumer, message); + } + } + catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control + handleException(records, acknowledgment, consumer, message, e); + } + } + /** * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException} * with a dedicated error message. @@ -408,26 +450,63 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me * response message to the SendTo topic. * @param resultArg the result object to handle (never null) * @param request the original request message + * @param acknowledgment the acknowledgment to manual ack + * @param consumer the consumer to handler error * @param source the source data for the method invocation - e.g. * {@code o.s.messaging.Message}; may be null */ - protected void handleResult(Object resultArg, Object request, Object source) { + protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, + Consumer consumer, @Nullable Message source) { + this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it"); - boolean isInvocationResult = resultArg instanceof InvocationResult; - Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg; String replyTopic = evaluateReplyTopic(request, source, resultArg); Assert.state(replyTopic == null || this.replyTemplate != null, "a KafkaTemplate is required to support replies"); - sendResponse(result, replyTopic, source, isInvocationResult - ? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType); + + Object result = resultArg instanceof InvocationResult invocationResult ? + invocationResult.result() : + resultArg; + boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ? + invocationResult.messageReturnType() : + this.messageReturnType; + + if (result instanceof CompletableFuture completable) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + + "otherwise the container will ack the message immediately"); + } + completable.whenComplete((r, t) -> { + if (t == null) { + asyncSuccess(r, replyTopic, source, messageReturnType); + acknowledge(acknowledgment); + } + else { + asyncFailure(request, acknowledgment, consumer, t, source); + } + }); + } + else if (monoPresent && result instanceof Mono mono) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); + } + mono.subscribe( + r -> asyncSuccess(r, replyTopic, source, messageReturnType), + t -> asyncFailure(request, acknowledgment, consumer, t, source), + () -> acknowledge(acknowledgment) + ); + } + else { + sendResponse(result, replyTopic, source, messageReturnType); + } } @Nullable private String evaluateReplyTopic(Object request, Object source, Object result) { String replyTo = null; if (result instanceof InvocationResult invResult) { - replyTo = evaluateTopic(request, source, result, invResult.getSendTo()); + replyTo = evaluateTopic(request, source, result, invResult.sendTo()); } else if (this.replyTopicExpression != null) { replyTo = evaluateTopic(request, source, result, this.replyTopicExpression); @@ -558,6 +637,63 @@ private void sendReplyForMessageSource(Object result, String topic, Message s this.replyTemplate.send(builder.build()); } + protected void asyncSuccess(@Nullable Object result, String replyTopic, Message source, boolean returnTypeMessage) { + if (result == null) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Async result is null, ignoring"); + } + } + else { + sendResponse(result, replyTopic, source, returnTypeMessage); + } + } + + protected void acknowledge(@Nullable Acknowledgment acknowledgment) { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + } + } + + protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, + Throwable t, Message source) { + + try { + handleException(request, acknowledgment, consumer, source, + new ListenerExecutionFailedException(createMessagingErrorMessage( + "Async Fail", source.getPayload()), t)); + } + catch (Throwable ex) { + this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); + acknowledge(acknowledgment); + } + } + + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + Message message, ListenerExecutionFailedException e) { + + if (this.errorHandler != null) { + try { + if (NULL_MESSAGE.equals(message)) { + message = new GenericMessage<>(records); + } + Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment); + if (errorResult != null && !(errorResult instanceof InvocationResult)) { + Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload()); + handleResult(Objects.requireNonNullElse(result, errorResult), + records, acknowledgment, consumer, message); + } + } + catch (Exception ex) { + throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss + "Listener error handler threw an exception for the incoming message", + message.getPayload()), ex); + } + } + else { + throw e; + } + } + private void setCorrelation(MessageBuilder builder, Message source) { byte[] correlationBytes = getCorrelation(source); if (correlationBytes != null) { @@ -621,6 +757,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity isNotConvertible |= isAck; boolean isConsumer = parameterIsType(parameterType, Consumer.class); isNotConvertible |= isConsumer; + boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType()); + isNotConvertible |= isCoroutines; boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class); this.hasMetadataParameter |= isMeta; isNotConvertible |= isMeta; @@ -639,7 +777,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity break; } } - else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) { + else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) { allowedBatchParameters++; } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index c0da30c278..ee9b791009 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -23,12 +23,10 @@ import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.KafkaListenerErrorHandler; -import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.ProjectingMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; /** @@ -53,8 +51,6 @@ public class RecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter implements AcknowledgingConsumerAwareMessageListener { - private final KafkaListenerErrorHandler errorHandler; - public RecordMessagingMessageListenerAdapter(Object bean, Method method) { this(bean, method, null); } @@ -62,8 +58,7 @@ public RecordMessagingMessageListenerAdapter(Object bean, Method method) { public RecordMessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) { - super(bean, method); - this.errorHandler = errorHandler; + super(bean, method, errorHandler); } /** @@ -88,33 +83,7 @@ public void onMessage(ConsumerRecord record, @Nullable Acknowledgment ackn if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) { this.logger.debug("Processing [" + message + "]"); } - try { - Object result = invokeHandler(record, acknowledgment, message, consumer); - if (result != null) { - handleResult(result, record, message); - } - } - catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control - if (this.errorHandler != null) { - try { - if (message.equals(NULL_MESSAGE)) { - message = new GenericMessage<>(record); - } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, record, message); - } - } - catch (Exception ex) { - throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss - "Listener error handler threw an exception for the incoming message", - message.getPayload()), ex); - } - } - else { - throw e; - } - } + invoke(record, acknowledgment, consumer, message); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 08c015fb4b..47217d53f1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -81,4 +81,8 @@ default void nack(int index, Duration sleep) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); } + default boolean isOutOfOrderCommit() { + return false; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java new file mode 100644 index 0000000000..24d668737e --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -0,0 +1,371 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Mono; + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = { + AsyncListenerTests.FUTURE_TOPIC_1, AsyncListenerTests.FUTURE_TOPIC_BATCH_1, + AsyncListenerTests.MONO_TOPIC_1, AsyncListenerTests.MONO_TOPIC_BATCH_1, + AsyncListenerTests.AUTO_DETECT_ASYNC_FUTURE, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_FUTURE, + AsyncListenerTests.AUTO_DETECT_ASYNC_MONO, AsyncListenerTests.AUTO_DETECT_ASYNC_BATCH_MONO, + AsyncListenerTests.AUTO_DETECT_ASYNC_KAFKA_HANDLER, AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) +public class AsyncListenerTests { + + static final String FUTURE_TOPIC_1 = "future-topic-1"; + + static final String FUTURE_TOPIC_BATCH_1 = "future-topic-batch-1"; + + static final String MONO_TOPIC_1 = "mono-topic-1"; + + static final String MONO_TOPIC_BATCH_1 = "mono-topic-batch-1"; + + static final String SEND_TOPIC_1 = "send-topic-1"; + + static final String AUTO_DETECT_ASYNC_FUTURE = "auto-detect-async-future"; + + static final String AUTO_DETECT_ASYNC_BATCH_FUTURE = "auto-detect-async-batch-future"; + + static final String AUTO_DETECT_ASYNC_MONO = "auto-detect-async-mono"; + + static final String AUTO_DETECT_ASYNC_BATCH_MONO = "auto-detect-async-batch-mono"; + + static final String AUTO_DETECT_ASYNC_KAFKA_HANDLER = "auto-detect-async-kafka-handler"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private Config config; + + @Autowired + private Listener listener; + + @Autowired + MultiMethodListener multiMethodListener; + + @Test + public void testAsyncListener() throws Exception { + + kafkaTemplate.send(FUTURE_TOPIC_1, "foo-1"); + ConsumerRecord cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 0); + assertThat(cr1.value()).isEqualTo("FOO-1"); + kafkaTemplate.send(FUTURE_TOPIC_1, "bar-1"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 1); + assertThat(cr1.value()).isEqualTo("bar-1_eh"); + + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "foo-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 2); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "bar-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 3); + assertThat(cr1.value()).isEqualTo("[bar-2]_beh"); + + kafkaTemplate.send(MONO_TOPIC_1, "foo-3"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 4); + assertThat(cr1.value()).isEqualTo("FOO-3"); + kafkaTemplate.send(MONO_TOPIC_1, "bar-3"); + assertThat(config.latch1.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 5); + assertThat(cr1.value()).isEqualTo("bar-3_eh"); + + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "foo-4"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 6); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "bar-4"); + assertThat(config.latch2.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 7); + assertThat(cr1.value()).isEqualTo("[bar-4]_beh"); + } + + @Test + public void testAsyncAcks() throws Exception { + + kafkaTemplate.send(AUTO_DETECT_ASYNC_FUTURE, "baz-future"); + assertThat(this.listener.autoDetectFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_FUTURE, "baz-batch-future"); + assertThat(this.listener.autoDetectBatchFuture.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_MONO, "baz-mono"); + assertThat(this.listener.autoDetectMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_BATCH_MONO, "baz-batch-mono"); + assertThat(this.listener.autoDetectBatchMono.await(10, TimeUnit.SECONDS)).isTrue(); + + kafkaTemplate.send(AUTO_DETECT_ASYNC_KAFKA_HANDLER, "foo-multi-async"); + assertThat(this.multiMethodListener.handler1.await(10, TimeUnit.SECONDS)).isTrue(); + } + + public static class Listener { + + private final AtomicBoolean future1 = new AtomicBoolean(true); + + private final AtomicBoolean futureBatch1 = new AtomicBoolean(true); + + private final AtomicBoolean mono1 = new AtomicBoolean(true); + + private final AtomicBoolean monoBatch1 = new AtomicBoolean(true); + + public final CountDownLatch autoDetectFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectMono = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchFuture = new CountDownLatch(1); + + public final CountDownLatch autoDetectBatchMono = new CountDownLatch(1); + + @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen1(String foo) { + CompletableFuture future = new CompletableFuture<>(); + if (future1.getAndSet(false)) { + future.complete(foo.toUpperCase()); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception()")); + } + return future; + } + + @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen2(List foo) { + CompletableFuture future = new CompletableFuture<>(); + if (futureBatch1.getAndSet(false)) { + future.complete(String.valueOf(foo.size())); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception(batch)")); + } + return future; + } + + @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler", + containerFactory = "kafkaListenerContainerFactory") + @SendTo(SEND_TOPIC_1) + public Mono listen3(String bar) { + if (mono1.getAndSet(false)) { + return Mono.just(bar.toUpperCase()); + } + else { + return Mono.error(new RuntimeException("Mono.error()")); + } + } + + @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler", + containerFactory = "kafkaBatchListenerContainerFactory") + @SendTo(SEND_TOPIC_1) + public Mono listen4(List bar) { + if (monoBatch1.getAndSet(false)) { + return Mono.just(String.valueOf(bar.size())); + } + else { + return Mono.error(new RuntimeException("Mono.error(batch)")); + } + } + + @KafkaListener(id = "autoDetectFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaListenerContainerFactory") + public CompletableFuture listen5(String baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(baz.toUpperCase()); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectBatchFuture", topics = AUTO_DETECT_ASYNC_FUTURE, + containerFactory = "kafkaBatchListenerContainerFactory") + public CompletableFuture listen6(List baz, Acknowledgment acknowledgment) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(String.valueOf(baz.size())); + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchFuture.countDown(); + } + return future; + } + + @KafkaListener(id = "autoDetectMono", topics = AUTO_DETECT_ASYNC_MONO, + containerFactory = "kafkaListenerContainerFactory") + public Mono listen7(String qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectMono.countDown(); + } + return Mono.just(qux.toUpperCase()); + } + + @KafkaListener(id = "autoDetectBatchMono", topics = AUTO_DETECT_ASYNC_BATCH_MONO, + containerFactory = "kafkaBatchListenerContainerFactory") + public Mono listen8(List qux, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + autoDetectBatchMono.countDown(); + } + return Mono.just(String.valueOf(qux.size())); + } + + } + + @KafkaListener(id = "autoDetectKafkaHandler", topics = AUTO_DETECT_ASYNC_KAFKA_HANDLER, + containerFactory = "kafkaListenerContainerFactory") + public static class MultiMethodListener { + + public final CountDownLatch handler1 = new CountDownLatch(1); + + @KafkaHandler + public Mono handler1(String foo, Acknowledgment acknowledgment) { + if (acknowledgment.isOutOfOrderCommit()) { + handler1.countDown(); + } + return Mono.just(foo); + } + + @KafkaHandler + public CompletableFuture handler2(Integer bar) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(bar); + return future; + } + + @KafkaHandler(isDefault = true) + public void handler2(Short baz, Acknowledgment acknowledgment) { + acknowledgment.acknowledge(); + } + + } + + @Configuration + @EnableKafka + public static class Config { + + private final CountDownLatch latch1 = new CountDownLatch(2); + + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Bean + public Listener listener() { + return new Listener(); + } + + @Bean + public MultiMethodListener multiMethodListener() { + return new MultiMethodListener(); + } + + @Bean + public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { + KafkaTemplate template = new KafkaTemplate<>(producerFactory(embeddedKafka)); + template.setConsumerFactory(consumerFactory(embeddedKafka)); + return template; + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka)); + } + + @Bean + public Map producerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.producerProps(embeddedKafka); + } + + @Bean + public KafkaListenerErrorHandler errorHandler() { + return (message, exception) -> { + latch1.countDown(); + return message.getPayload() + "_eh"; + }; + } + + @Bean + public KafkaListenerErrorHandler errorBatchHandler() { + return (message, exception) -> { + latch2.countDown(); + return message.getPayload() + "_beh"; + }; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory( + EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka)); + } + + @Bean + public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.consumerProps("test", "false", embeddedKafka); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setBatchListener(true); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index dd757944f6..27808beea3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -470,10 +470,12 @@ public void testMulti() throws Exception { assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); ConsumerRecord reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); assertThat(reply.value()).isEqualTo("OK"); - consumer.close(); template.send("annotated8", 0, 1, "junk"); assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue(); + ConsumerRecord reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); + consumer.close(); + assertThat(reply2.value()).isEqualTo("JUNK intentional"); assertThat(this.multiListener.meta).isNotNull(); } @@ -1754,7 +1756,8 @@ public Object resolveArgument(MethodParameter parameter, Message message) { public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) { return (m, e) -> { listener.errorLatch.countDown(); - return null; + String payload = m.getPayload().toString().toUpperCase(); + return payload + " " + e.getCause().getMessage(); }; } @@ -2468,6 +2471,7 @@ static class MultiListenerBean { volatile ConsumerRecordMetadata meta; @KafkaHandler + @SendTo("annotated8reply") public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) { if ("junk".equals(bar)) { throw new RuntimeException("intentional"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index d19d1bca26..40fef84669 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,17 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -39,8 +46,11 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.publisher.Mono; + /** * @author Gary Russell + * @author Wang Zhiyang * @since 2.2.5 * */ @@ -60,6 +70,42 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr assertThat(foo.group).isEqualTo("test.group"); } + @SuppressWarnings("unchecked") + @Test + public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, + @Autowired BatchFuture batchFuture) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("batchFuture").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.future"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("batchFuture", 0, 0L, null, "future_1")); + list.add(new ConsumerRecord<>("batchFuture", 0, 1L, null, "future_2")); + list.add(new ConsumerRecord<>("batchFuture", 1, 0L, null, "future_3")); + adapter.onMessage(list, null, null); + assertThat(batchFuture.group).isEqualTo("test.group.future"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired BatchMono batchMono) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("batchMono").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.mono"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("batchMono", 0, 0L, null, "mono_1")); + list.add(new ConsumerRecord<>("batchMono", 0, 1L, null, "mono_2")); + adapter.onMessage(list, null, null); + assertThat(batchMono.group).isEqualTo("test.group.mono"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + public static class Foo { public volatile String value = "someValue"; @@ -68,10 +114,38 @@ public static class Foo { @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { - list.forEach(s -> { - this.value = s; - }); + list.forEach(s -> this.value = s); + this.group = groupId; + } + + } + + public static class BatchFuture { + + public volatile String group; + + @KafkaListener(id = "batchFuture", topics = "batchFuture", autoStartup = "false") + public CompletableFuture listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + + this.group = groupId; + CompletableFuture future = new CompletableFuture<>(); + future.complete("processed: " + list.size()); + return future; + } + + } + + public static class BatchMono { + + public volatile String value = "someValue"; + + public volatile String group; + + @KafkaListener(id = "batchMono", topics = "batchMono", autoStartup = "false") + public Mono listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + this.group = groupId; + return Mono.just(list.size()); } } @@ -85,11 +159,20 @@ public Foo foo() { return new Foo(); } + @Bean + public BatchFuture batchFuture() { + return new BatchFuture(); + } + + @Bean + public BatchMono batchMono() { + return new BatchMono(); + } + @SuppressWarnings({ "rawtypes" }) @Bean public ConsumerFactory consumerFactory() { - ConsumerFactory consumerFactory = mock(ConsumerFactory.class); - return consumerFactory; + return mock(ConsumerFactory.class); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -100,6 +183,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont factory.setBatchListener(true); return factory; } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index a20468b7b1..800c399cba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,16 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.lang.reflect.Method; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -33,6 +38,8 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.support.GenericMessage; +import reactor.core.publisher.Mono; + /** * @author Gary Russell * @since 1.1.2 @@ -66,6 +73,37 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknow verify(converter).toMessage(cr, ack, null, String.class); } + @Test + public void testCompletableFutureReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("future", String.class, Acknowledgment.class); + testAsyncResult(method, "bar"); + } + + @Test + public void testMonoReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("mono", String.class, Acknowledgment.class); + testAsyncResult(method, "baz"); + } + + private void testAsyncResult(Method method, String topic) { + + KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); + RecordMessagingMessageListenerAdapter adapter = + spy(new RecordMessagingMessageListenerAdapter<>(this, method)); + adapter.setHandlerMethod( + new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method))); + ConsumerRecord cr = new ConsumerRecord<>(topic, 0, 0L, null, "foo"); + Acknowledgment ack = mock(Acknowledgment.class); + RecordMessageConverter converter = mock(RecordMessageConverter.class); + willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class); + adapter.setMessageConverter(converter); + adapter.onMessage(cr, ack, null); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + @Test void testMissingAck() throws NoSuchMethodException, SecurityException { KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); @@ -84,4 +122,14 @@ public void test(Acknowledgment ack) { } + public CompletableFuture future(String data, Acknowledgment ack) { + + return CompletableFuture.completedFuture("processed" + data); + } + + public Mono mono(String data, Acknowledgment ack) { + + return Mono.just(data); + } + } diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt new file mode 100644 index 0000000000..aaf1f079d3 --- /dev/null +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -0,0 +1,241 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaHandler +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.listener.KafkaListenerErrorHandler +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.messaging.handler.annotation.SendTo +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import java.lang.Exception +import java.time.Duration +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + + +/** + * Kotlin Annotated async return listener tests. + * + * @author Wang ZhiYang + * + * @since 3.1 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1) +class EnableKafkaKotlinCoroutinesTests { + + @Autowired + private lateinit var config: Config + + @Autowired + private lateinit var template: KafkaTemplate + + @Test + fun `test listener`() { + this.template.send("kotlinAsyncTestTopic1", "foo") + assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.received).isEqualTo("foo") + } + + @Test + fun `test checkedEx`() { + this.template.send("kotlinAsyncTestTopic2", "fail") + assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.error).isTrue() + } + + @Test + fun `test batch listener`() { + this.template.send("kotlinAsyncBatchTestTopic1", "foo") + assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.batchReceived).isEqualTo("foo") + } + + @Test + fun `test batch checkedEx`() { + this.template.send("kotlinAsyncBatchTestTopic2", "fail") + assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.batchError).isTrue() + } + + @Test + fun `test checkedKh reply`() { + this.template.send("kotlinAsyncTestTopic3", "foo") + val cr = this.template.receive("kotlinReplyTopic1", 0, 0, Duration.ofSeconds(30)) + assertThat(cr?.value() ?: "null").isEqualTo("FOO") + } + + @KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"], + containerFactory = "kafkaListenerContainerFactory") + class Listener { + + @KafkaHandler + @SendTo("kotlinReplyTopic1") + suspend fun handler1(value: String) : String { + return value.uppercase() + } + + } + + @Configuration + @EnableKafka + class Config { + + @Volatile + lateinit var received: String + + @Volatile + lateinit var batchReceived: String + + @Volatile + var error: Boolean = false + + @Volatile + var batchError: Boolean = false + + val latch1 = CountDownLatch(1) + + val latch2 = CountDownLatch(1) + + val batchLatch1 = CountDownLatch(1) + + val batchLatch2 = CountDownLatch(1) + + @Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private lateinit var brokerAddresses: String + + @Bean + fun listener() : Listener { + return Listener() + } + + @Bean + fun kpf(): ProducerFactory { + val configs = HashMap() + configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses + configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + return DefaultKafkaProducerFactory(configs) + } + + @Bean + fun kcf(): ConsumerFactory { + val configs = HashMap() + configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses + configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false + configs[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + return DefaultKafkaConsumerFactory(configs) + } + + @Bean + fun kt(): KafkaTemplate { + val kafkaTemplate = KafkaTemplate(kpf()) + kafkaTemplate.setConsumerFactory(kcf()) + return kafkaTemplate + } + + @Bean + fun errorHandler() : KafkaListenerErrorHandler { + return KafkaListenerErrorHandler { message, _ -> + error = true; + latch2.countDown() + message.payload; + } + } + + @Bean + fun errorHandlerBatch() : KafkaListenerErrorHandler { + return KafkaListenerErrorHandler { message, _ -> + batchError = true; + batchLatch2.countDown() + message.payload; + } + } + + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory: ConcurrentKafkaListenerContainerFactory + = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = kcf() + factory.setReplyTemplate(kt()) + return factory + } + + @Bean + fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory: ConcurrentKafkaListenerContainerFactory + = ConcurrentKafkaListenerContainerFactory() + factory.isBatchListener = true + factory.consumerFactory = kcf() + return factory + } + + @KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"], + containerFactory = "kafkaListenerContainerFactory") + suspend fun listen(value: String) { + this.received = value + this.latch1.countDown() + } + + @KafkaListener(id = "kotlin-ex", topics = ["kotlinAsyncTestTopic2"], + containerFactory = "kafkaListenerContainerFactory", errorHandler = "errorHandler") + suspend fun listenEx(value: String) { + if (value == "fail") { + throw Exception("checked") + } + } + + @KafkaListener(id = "kotlin-batch", topics = ["kotlinAsyncBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory") + suspend fun batchListen(values: List>) { + this.batchReceived = values.first().value() + this.batchLatch1.countDown() + } + + @KafkaListener(id = "kotlin-batch-ex", topics = ["kotlinAsyncBatchTestTopic2"], + containerFactory = "kafkaBatchListenerContainerFactory", errorHandler = "errorHandlerBatch") + suspend fun batchListenEx(values: List>) { + if (values.first().value() == "fail") { + throw Exception("checked") + } + } + + } + +}