Skip to content

Commit

Permalink
GH-1189: support kotlin suspend
Browse files Browse the repository at this point in the history
* Support kotlin suspend
* Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo`
* Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter`
* Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
  • Loading branch information
Zhiyang.Wang1 committed Dec 21, 2023
1 parent fcbc7d5 commit 3ea5faf
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 12 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ext {
junit4Version = '4.13.2'
junitJupiterVersion = '5.10.1'
kafkaVersion = '3.6.1'
kotlinCoroutinesVersion = '1.7.3'
log4jVersion = '2.21.1'
micrometerDocsVersion = '1.0.2'
micrometerVersion = '1.12.1'
Expand Down Expand Up @@ -276,6 +277,7 @@ project ('spring-kafka') {
}
api "org.apache.kafka:kafka-clients:$kafkaVersion"
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
optionalApi 'com.fasterxml.jackson.core:jackson-core'
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ IMPORTANT: The listener container factory must be configured with manual ack mod
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure.
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

/**
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
* <p>
* This class is similar to
* {@link org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver}
* but for regular {@link HandlerMethodArgumentResolver} contract.
*
* @author Wang Zhiyang
*
* @since 3.1
*
* @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver
*/
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

import reactor.core.publisher.Mono;

public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver {

@Override
public boolean supportsParameter(MethodParameter parameter) {
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
}

@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return Mono.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() {
}

private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory();
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
if (validator != null) {
defaultFactory.setValidator(validator);
Expand All @@ -1170,8 +1170,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {

List<HandlerMethodArgumentResolver> customArgumentsResolver =
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);

defaultFactory.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.reflect.Method;
import java.util.List;

import org.springframework.core.KotlinDetector;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.validation.Validator;

/**
* Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Kafka requirements.
*
* @author Wang Zhiyang
*
* @since 3.1
*/
public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {

private final HandlerMethodArgumentResolverComposite argumentResolvers =
new HandlerMethodArgumentResolverComposite();

private MessageConverter messageConverter;

private Validator validator;

@Override
public void setMessageConverter(MessageConverter messageConverter) {
super.setMessageConverter(messageConverter);
this.messageConverter = messageConverter;
}

@Override
public void setValidator(Validator validator) {
super.setValidator(validator);
this.validator = validator;
}

@Override
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
List<HandlerMethodArgumentResolver> resolvers = super.initArgumentResolvers();
if (KotlinDetector.isKotlinPresent()) {
// Insert before PayloadMethodArgumentResolver
resolvers.add(resolvers.size() - 1, new ContinuationHandlerMethodArgumentResolver());
}
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
resolvers.add(resolvers.size() - 1, new KafkaNullAwarePayloadArgumentResolver(this.messageConverter, this.validator));
this.argumentResolvers.addResolvers(resolvers);
return resolvers;
}

@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
InvocableHandlerMethod handlerMethod = new KotlinAwareInvocableHandlerMethod(bean, method);
handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
return handlerMethod;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.reflect.Method;

import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

/**
* An {@link InvocableHandlerMethod} extension for supporting Kotlin {@code suspend} function.
*
* @author Wang Zhiyang
*
* @since 3.1
*/
public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod {

public KotlinAwareInvocableHandlerMethod(Object bean, Method method) {
super(bean, method);
}

@Override
protected Object doInvoke(Object... args) throws Exception {
Method method = getBridgedMethod();
if (KotlinDetector.isSuspendingFunction(method)) {
return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
else {
return super.doInvoke(args);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
if (batchToRecordAdapter != null) {
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
}
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
messageListener.setBatchMessageConverter(batchMessageConverter);
}
listener = messageListener;
}
else {
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
messageListener.setMessageConverter(recordMessageConverter);
}
listener = messageListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.context.expression.MapAccessor;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
Expand Down Expand Up @@ -484,8 +485,8 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
}
else if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type; " +
"otherwise the container will ack the message immediately");
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
mono.subscribe(
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
Expand Down Expand Up @@ -660,7 +661,7 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
}
catch (Exception ex) {
}
this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source);
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
}

Expand Down Expand Up @@ -751,6 +752,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
isNotConvertible |= isAck;
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
isNotConvertible |= isConsumer;
boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
isNotConvertible |= isCoroutines;
boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class);
this.hasMetadataParameter |= isMeta;
isNotConvertible |= isMeta;
Expand All @@ -769,7 +772,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
break;
}
}
else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
allowedBatchParameters++;
}
}
Expand Down
Loading

0 comments on commit 3ea5faf

Please sign in to comment.