Skip to content

Commit

Permalink
GH-1189: Auto detect async reply
Browse files Browse the repository at this point in the history
* auto-detect async reply than coerce the out-of-order manual commit.
* add new interface `HandlerMethodDetect` to detect handler args and return type.
* add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene.
* modify async-returns.adoc
  • Loading branch information
Zhiyang.Wang1 committed Dec 28, 2023
1 parent 3ea5faf commit aee58bd
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public Mono<Void> listen(String data) {
}
----

IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes.
IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.HandlerMethodDetect;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -159,6 +160,7 @@
* @author Tomaz Fernandes
* @author Francois Rosiere
* @author Daniel Gentes
* @author Wang Zhiyang
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -659,6 +661,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean wantsFullRecords;

private final boolean asyncReplies;

private final boolean autoCommit;

private final boolean isManualAck;
Expand Down Expand Up @@ -849,6 +853,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {

this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies()
|| this.containerProperties.isAsyncAcks();
AckMode ackMode = determineAckMode();
this.isManualAck = ackMode.equals(AckMode.MANUAL);
this.isCountAck = ackMode.equals(AckMode.COUNT)
Expand All @@ -859,12 +865,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
this.isRecordAck = ackMode.equals(AckMode.RECORD);
this.offsetsInThisBatch =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
this.isAnyManualAck && this.asyncReplies
? new ConcurrentHashMap<>()
: null;
this.deferredOffsets =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
this.isAnyManualAck && this.asyncReplies
? new ConcurrentHashMap<>()
: null;

this.observationRegistry = observationRegistry;
Expand Down Expand Up @@ -903,8 +909,7 @@ else if (listener instanceof MessageListener) {
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + listener.getClass().getName());
+ "Acknowledging not " + listener.getClass().getName());
}
this.listenerType = listenerType;
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
Expand All @@ -927,18 +932,15 @@ else if (listener instanceof MessageListener) {
this.logger.info(toString());
}
ApplicationContext applicationContext = getApplicationContext();
ClassLoader classLoader = applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
consumerProperties, false,
applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader());
consumerProperties, false, classLoader);
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
consumerProperties, true,
applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader());
consumerProperties, true, classLoader);
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property, so we can use it directly from code elsewhere
Expand All @@ -964,6 +966,9 @@ private AckMode determineAckMode() {
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
ackMode = AckMode.MANUAL;
}
if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) {
ackMode = AckMode.MANUAL;
}
return ackMode;
}

Expand Down Expand Up @@ -3388,15 +3393,15 @@ public void acknowledge() {
public void nack(Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
"nack() is not supported with out-of-order commits (asyncAcks=true)");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
}

@Override
public boolean isAsyncAcks() {
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
Expand Down Expand Up @@ -3473,8 +3478,8 @@ public void acknowledge(int index) {
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
"nack() is not supported with out-of-order commits (asyncAcks=true)");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
Expand All @@ -3498,8 +3503,8 @@ public void nack(int index, Duration sleep) {
}

@Override
public boolean isAsyncAcks() {
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.kafka.listener.adapter;

import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -24,6 +26,9 @@
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;

import reactor.core.publisher.Mono;

/**
* Utilities for listener adapters.
Expand All @@ -40,6 +45,9 @@ public final class AdapterUtils {
*/
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");

private static final boolean MONO_PRESENT =
ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader());

private AdapterUtils() {
}

Expand Down Expand Up @@ -86,4 +94,16 @@ public static String getDefaultReplyTopicExpression() {
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
}

static boolean isAsyncReply(Class<?> resultType) {
return isMono(resultType) || isCompletableFuture(resultType);
}

static boolean isMono(Class<?> resultType) {
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
}

static boolean isCompletableFuture(Class<?> resultType) {
return CompletableFuture.class.isAssignableFrom(resultType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* unambiguous.
*
* @author Gary Russell
* @author Wang Zhiyang
*
*/
public class DelegatingInvocableHandler {
Expand Down Expand Up @@ -86,6 +87,8 @@ public class DelegatingInvocableHandler {

private final PayloadValidator validator;

private final boolean asyncReplies;

/**
* Construct an instance with the supplied handlers for the bean.
* @param handlers the handlers.
Expand Down Expand Up @@ -116,6 +119,15 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
? (ConfigurableListableBeanFactory) beanFactory
: null;
this.validator = validator == null ? null : new PayloadValidator(validator);
boolean asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler);
for (InvocableHandlerMethod handlerMethod : handlers) {
asyncReplies |= isAsyncReply(handlerMethod);
}
this.asyncReplies = asyncReplies;
}

private boolean isAsyncReply(InvocableHandlerMethod method) {
return AdapterUtils.isAsyncReply(method.getMethod().getReturnType());
}

private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
Expand All @@ -139,6 +151,15 @@ public Object getBean() {
return this.bean;
}

/**
* Return true if any handler method has an async reply type.
* @return the asyncReply.
* @since 3.2
*/
public boolean isAsyncReplies() {
return this.asyncReplies;
}

/**
* Invoke the method with the given message.
* @param message the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ public class HandlerAdapter {

private final DelegatingInvocableHandler delegatingHandler;

private final boolean asyncReplies;

/**
* Construct an instance with the provided method.
* @param invokerHandlerMethod the method.
*/
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
this.invokerHandlerMethod = invokerHandlerMethod;
this.delegatingHandler = null;
this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType());
}

/**
Expand All @@ -49,6 +52,16 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
this.invokerHandlerMethod = null;
this.delegatingHandler = delegatingHandler;
this.asyncReplies = delegatingHandler.isAsyncReplies();
}

/**
* Return true if any handler method has an async reply type.
* @return the asyncReply.
* @since 3.2
*/
public boolean isAsyncReplies() {
return this.asyncReplies;
}

public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener.adapter;

/**
* Auto-detect {@link HandlerAdapter} args and return type.
*
* @author Wang zhiyang
* @since 3.2
*/
public interface HandlerMethodDetect {

/**
* Return true if this listener is request/reply and the replies are async.
* @return true for async replies.
* @since 3.2
*/
default boolean isAsyncReplies() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
* @author Nathan Xu
* @author Wang ZhiYang
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, HandlerMethodDetect {

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

Expand Down Expand Up @@ -243,6 +243,10 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
this.handlerMethod = handlerMethod;
}

public boolean isAsyncReplies() {
return this.handlerMethod.isAsyncReplies();
}

protected boolean isConsumerRecordList() {
return this.isConsumerRecordList;
}
Expand Down Expand Up @@ -469,7 +473,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
messageReturnType = this.messageReturnType;
}
if (result instanceof CompletableFuture<?> completable) {
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
Expand All @@ -484,7 +488,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
});
}
else if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ default void nack(int index, Duration sleep) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}

default boolean isAsyncAcks() {
default boolean isOutOfOrderCommit() {
return false;
}

Expand Down
Loading

0 comments on commit aee58bd

Please sign in to comment.