Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1189: Asynchronous server-side processing in a request/reply scenario #2996

Merged
merged 11 commits into from
Jan 31, 2024
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ext {
junit4Version = '4.13.2'
junitJupiterVersion = '5.10.1'
kafkaVersion = '3.6.1'
kotlinCoroutinesVersion = '1.7.3'
log4jVersion = '2.22.1'
micrometerDocsVersion = '1.0.2'
micrometerVersion = '1.13.0-SNAPSHOT'
Expand Down Expand Up @@ -278,6 +279,7 @@ project ('spring-kafka') {
}
api "org.apache.kafka:kafka-clients:$kafkaVersion"
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
optionalApi 'com.fasterxml.jackson.core:jackson-core'
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
Expand Down
1 change: 1 addition & 0 deletions spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
**** xref:kafka/receiving-messages/message-listeners.adoc[]
**** xref:kafka/receiving-messages/message-listener-container.adoc[]
**** xref:kafka/receiving-messages/ooo-commits.adoc[]
**** xref:kafka/receiving-messages/async-returns.adoc[]
**** xref:kafka/receiving-messages/listener-annotation.adoc[]
**** xref:kafka/receiving-messages/listener-group-id.adoc[]
**** xref:kafka/receiving-messages/container-thread-naming.adoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[[async-returns]]
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
= Asynchronous `@KafkaListener` Return Types

`Starting with version 3.2, `@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
----

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
----

IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His

A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.

[[x32-async-return]]
=== Async @KafkaListener Return

`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,6 +87,7 @@
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
Expand Down Expand Up @@ -1155,7 +1156,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() {
}

private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory();
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
if (validator != null) {
defaultFactory.setValidator(validator);
Expand All @@ -1170,8 +1171,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {

List<HandlerMethodArgumentResolver> customArgumentsResolver =
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);

defaultFactory.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ private String getReplyTopic() {
}
String topic = destinations.length == 1 ? destinations[0] : "";
BeanFactory beanFactory = getBeanFactory();
if (beanFactory instanceof ConfigurableListableBeanFactory) {
topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic);
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
topic = configurableListableBeanFactory.resolveEmbeddedValue(topic);
if (topic != null) {
topic = resolve(topic);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,7 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
@Nullable
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -160,6 +161,7 @@
* @author Francois Rosiere
* @author Daniel Gentes
* @author Soby Chacko
* @author Wang Zhiyang
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -660,6 +662,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean wantsFullRecords;

private final boolean asyncReplies;

private final boolean autoCommit;

private final boolean isManualAck;
Expand Down Expand Up @@ -850,6 +854,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {

this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies()
|| this.containerProperties.isAsyncAcks();
AckMode ackMode = determineAckMode();
this.isManualAck = ackMode.equals(AckMode.MANUAL);
this.isCountAck = ackMode.equals(AckMode.COUNT)
Expand All @@ -860,12 +866,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
this.isRecordAck = ackMode.equals(AckMode.RECORD);
this.offsetsInThisBatch =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
this.isAnyManualAck && this.asyncReplies
? new ConcurrentHashMap<>()
: null;
this.deferredOffsets =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
this.isAnyManualAck && this.asyncReplies
? new ConcurrentHashMap<>()
: null;

this.observationRegistry = observationRegistry;
Expand Down Expand Up @@ -904,8 +910,7 @@ else if (listener instanceof MessageListener) {
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + listener.getClass().getName());
+ "Acknowledging not " + listener.getClass().getName());
}
this.listenerType = listenerType;
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
Expand All @@ -928,18 +933,15 @@ else if (listener instanceof MessageListener) {
this.logger.info(toString());
}
ApplicationContext applicationContext = getApplicationContext();
ClassLoader classLoader = applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
consumerProperties, false,
applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader());
consumerProperties, false, classLoader);
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
consumerProperties, true,
applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader());
consumerProperties, true, classLoader);
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property, so we can use it directly from code elsewhere
Expand All @@ -965,6 +967,9 @@ private AckMode determineAckMode() {
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
ackMode = AckMode.MANUAL;
}
if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) {
ackMode = AckMode.MANUAL;
}
return ackMode;
}

Expand Down Expand Up @@ -3389,12 +3394,17 @@ public void acknowledge() {
public void nack(Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
"nack() is not supported with out-of-order commits (asyncAcks=true)");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
}

@Override
public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
public String toString() {
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
Expand Down Expand Up @@ -3469,8 +3479,8 @@ public void acknowledge(int index) {
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
"nack() is not supported with out-of-order commits (asyncAcks=true)");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
Expand All @@ -3493,6 +3503,11 @@ public void nack(int index, Duration sleep) {
processAcks(new ConsumerRecords<K, V>(newRecords));
}

@Override
public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
public String toString() {
return "Acknowledgment for " + this.records;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.kafka.listener.adapter;

import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -24,11 +26,15 @@
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;

import reactor.core.publisher.Mono;

/**
* Utilities for listener adapters.
*
* @author Gary Russell
* @author Wang Zhiyang
* @since 2.5
*
*/
Expand All @@ -40,6 +46,9 @@ public final class AdapterUtils {
*/
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private static final boolean MONO_PRESENT =
ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader());

private AdapterUtils() {
}

Expand Down Expand Up @@ -85,4 +94,34 @@ public static String getDefaultReplyTopicExpression() {
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
}

/**
* Return the true when return types are asynchronous.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code Mono} or {@code CompletableFuture}.
* @since 3.2
*/
public static boolean isAsyncReply(Class<?> resultType) {
return isMono(resultType) || isCompletableFuture(resultType);
}

/**
* Return the true when type is {@code Mono}.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code Mono}.
* @since 3.2
*/
public static boolean isMono(Class<?> resultType) {
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
}

/**
* Return the true when type is {@code CompletableFuture}.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code CompletableFuture}.
* @since 3.2
*/
public static boolean isCompletableFuture(Class<?> resultType) {
return CompletableFuture.class.isAssignableFrom(resultType);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

/**
* Message handler adapter implementing this interface can detect {@link HandlerAdapter} async return types.
*
* @author Wang zhiyang
*
* @since 3.2
*/
public interface AsyncRepliesAware {

/**
* Return true if the {@link HandlerAdapter} return type is async.
* @return true for async replies.
* @since 3.2
*/
default boolean isAsyncReplies() {
return false;
}

}
Loading
Loading