Skip to content

Commit

Permalink
* add javadoc in AdapterUtils
Browse files Browse the repository at this point in the history
* move class from package `annotation` to package `adapter`
* re name bar,baz in BatchMessagingMessageListenerAdapterTests
* poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests`
* poblish doc async-returns.adoc and nav.adoc
  • Loading branch information
Wzy19930507 committed Jan 19, 2024
1 parent 2ea5282 commit 4cd372b
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 45 deletions.
1 change: 1 addition & 0 deletions spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
**** xref:kafka/receiving-messages/message-listeners.adoc[]
**** xref:kafka/receiving-messages/message-listener-container.adoc[]
**** xref:kafka/receiving-messages/ooo-commits.adoc[]
**** xref:kafka/receiving-messages/async-returns.adoc[]
**** xref:kafka/receiving-messages/listener-annotation.adoc[]
**** xref:kafka/receiving-messages/listener-group-id.adoc[]
**** xref:kafka/receiving-messages/container-thread-naming.adoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[[async-returns]]
= Asynchronous `@KafkaListener` Return Types

`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture<?>` and `Mono<?>`, letting the reply be sent asynchronously.
`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,6 +87,7 @@
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
* Utilities for listener adapters.
*
* @author Gary Russell
* @author Wang Zhiyang
* @since 2.5
*
*/
Expand Down Expand Up @@ -93,15 +94,33 @@ public static String getDefaultReplyTopicExpression() {
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
}

static boolean isAsyncReply(Class<?> resultType) {
/**
* Return the true when return types are asynchronous.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code Mono} or {@code CompletableFuture}.
* @since 3.2
*/
public static boolean isAsyncReply(Class<?> resultType) {
return isMono(resultType) || isCompletableFuture(resultType);
}

static boolean isMono(Class<?> resultType) {
/**
* Return the true when type is {@code Mono}.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code Mono}.
* @since 3.2
*/
public static boolean isMono(Class<?> resultType) {
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
}

static boolean isCompletableFuture(Class<?> resultType) {
/**
* Return the true when type is {@code CompletableFuture}.
* @param resultType {@code InvocableHandlerMethod} return type.
* @return type is {@code CompletableFuture}.
* @since 3.2
*/
public static boolean isCompletableFuture(Class<?> resultType) {
return CompletableFuture.class.isAssignableFrom(resultType);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.kafka.annotation;
package org.springframework.kafka.listener.adapter;

/**
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
Expand All @@ -25,7 +25,7 @@
*
* @author Wang Zhiyang
*
* @since 3.1
* @since 3.2
*
* @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.kafka.annotation;
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;
import java.util.List;
Expand All @@ -32,7 +32,7 @@
*
* @author Wang Zhiyang
*
* @since 3.1
* @since 3.2
*/
public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.kafka.annotation;
package org.springframework.kafka.listener.adapter;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.kafka.annotation;
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;

Expand All @@ -27,7 +27,7 @@
*
* @author Wang Zhiyang
*
* @since 3.1
* @since 3.2
*/
public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@

/**
* @author Gary Russell
* @author Wang Zhiyang
* @since 2.2.5
*
*/
Expand All @@ -71,35 +72,36 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr

@SuppressWarnings("unchecked")
@Test
public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Bar bar) {
public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry,
@Autowired BatchFuture batchFuture) {

BatchMessagingMessageListenerAdapter<String, String> adapter =
spy((BatchMessagingMessageListenerAdapter<String, String>) registry
.getListenerContainer("bar").getContainerProperties().getMessageListener());
.getListenerContainer("batchFuture").getContainerProperties().getMessageListener());
KafkaUtils.setConsumerGroupId("test.group.future");
List<ConsumerRecord<String, String>> list = new ArrayList<>();
list.add(new ConsumerRecord<>("bar", 0, 0L, null, "future_1"));
list.add(new ConsumerRecord<>("bar", 0, 1L, null, "future_2"));
list.add(new ConsumerRecord<>("bar", 1, 0L, null, "future_3"));
list.add(new ConsumerRecord<>("batchFuture", 0, 0L, null, "future_1"));
list.add(new ConsumerRecord<>("batchFuture", 0, 1L, null, "future_2"));
list.add(new ConsumerRecord<>("batchFuture", 1, 0L, null, "future_3"));
adapter.onMessage(list, null, null);
assertThat(bar.group).isEqualTo("test.group.future");
assertThat(batchFuture.group).isEqualTo("test.group.future");
verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean());
verify(adapter, times(1)).acknowledge(any());
}

@SuppressWarnings("unchecked")
@Test
public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Baz baz) {
public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired BatchMono batchMono) {

BatchMessagingMessageListenerAdapter<String, String> adapter =
spy((BatchMessagingMessageListenerAdapter<String, String>) registry
.getListenerContainer("baz").getContainerProperties().getMessageListener());
.getListenerContainer("batchMono").getContainerProperties().getMessageListener());
KafkaUtils.setConsumerGroupId("test.group.mono");
List<ConsumerRecord<String, String>> list = new ArrayList<>();
list.add(new ConsumerRecord<>("baz", 0, 0L, null, "mono_1"));
list.add(new ConsumerRecord<>("baz", 0, 1L, null, "mono_2"));
list.add(new ConsumerRecord<>("batchMono", 0, 0L, null, "mono_1"));
list.add(new ConsumerRecord<>("batchMono", 0, 1L, null, "mono_2"));
adapter.onMessage(list, null, null);
assertThat(baz.group).isEqualTo("test.group.mono");
assertThat(batchMono.group).isEqualTo("test.group.mono");
verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean());
verify(adapter, times(1)).acknowledge(any());
}
Expand All @@ -118,11 +120,11 @@ public void listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String grou

}

public static class Bar {
public static class BatchFuture {

public volatile String group;

@KafkaListener(id = "bar", topics = "bar", autoStartup = "false")
@KafkaListener(id = "batchFuture", topics = "batchFuture", autoStartup = "false")
public CompletableFuture<String> listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String groupId) {

this.group = groupId;
Expand All @@ -133,13 +135,13 @@ public CompletableFuture<String> listen(List<String> list, @Header(KafkaHeaders.

}

public static class Baz {
public static class BatchMono {

public volatile String value = "someValue";

public volatile String group;

@KafkaListener(id = "baz", topics = "baz", autoStartup = "false")
@KafkaListener(id = "batchMono", topics = "batchMono", autoStartup = "false")
public Mono<Integer> listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String groupId) {

this.group = groupId;
Expand All @@ -158,13 +160,13 @@ public Foo foo() {
}

@Bean
public Bar bar() {
return new Bar();
public BatchFuture batchFuture() {
return new BatchFuture();
}

@Bean
public Baz baz() {
return new Baz();
public BatchMono batchMono() {
return new BatchMono();
}

@SuppressWarnings({ "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,9 +124,7 @@ public void test(Acknowledgment ack) {

public CompletableFuture<String> future(String data, Acknowledgment ack) {

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("processed" + data);
return future;
return CompletableFuture.completedFuture("processed" + data);
}

public Mono<String> mono(String data, Acknowledgment ack) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"])
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"])
class EnableKafkaKotlinCoroutinesTests {

@Autowired
Expand Down Expand Up @@ -96,16 +96,16 @@ class EnableKafkaKotlinCoroutinesTests {
@Test
fun `test checkedKh reply`() {
this.template.send("kotlinAsyncTestTopic3", "foo")
val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30))
assertThat(cr.value()).isEqualTo("FOO")
val cr = this.template.receive("kotlinReplyTopic1", 0, 0, Duration.ofSeconds(30))
assertThat(cr?.value() ?: "null").isEqualTo("FOO")
}

@KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"],
containerFactory = "kafkaListenerContainerFactory")
class Listener {

@KafkaHandler
@SendTo("sendTopicReply1")
@SendTo("kotlinReplyTopic1")
suspend fun handler1(value: String) : String {
return value.uppercase()
}
Expand Down

0 comments on commit 4cd372b

Please sign in to comment.