From 4e9c0997cb5d99f596880e8323640947cc4fd39b Mon Sep 17 00:00:00 2001 From: Wang Zhiyang <1208931582@qq.com> Date: Wed, 13 Mar 2024 07:27:23 +0800 Subject: [PATCH] @RetryableTopic support for KL on class part 2 * @RetryableTopic support for KafkaListener annotation on class part 2 * `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`. * `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class. * Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. * Deprecated `EndpointCustomizerFactory.addSuffixesAndMethod`. * Document public API changes in `whats-new.adoc`(or javadoc). part2 of #3105 and this contributes to fixing https://github.com/spring-projects/spring-kafka/pull/3105 eventually --- .../antora/modules/ROOT/pages/whats-new.adoc | 7 + .../MultiMethodKafkaListenerEndpoint.java | 44 ++++- .../kafka/retrytopic/EndpointCustomizer.java | 10 +- .../retrytopic/EndpointCustomizerFactory.java | 89 +++++++--- .../retrytopic/RetryTopicConfigurer.java | 75 ++++++--- .../kafka/support/EndpointHandlerMethod.java | 13 +- .../support/EndpointHandlerMultiMethod.java | 80 +++++++++ .../EndpointCustomizerFactoryTests.java | 152 ++++++++++-------- .../retrytopic/RetryTopicConfigurerTests.java | 49 +++--- 9 files changed, 381 insertions(+), 138 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMultiMethod.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 033f22ea09..c22b60913c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -56,6 +56,13 @@ See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topi Provides a new public API to find `RetryTopicConfiguration`. See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration] +=== RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint. +The `RetryTopicConfigurer` support process and register `MultiMethodKafkaListenerEndpoint`. +The `MultiMethodKafkaListenerEndpoint` provides `getter/setter` for properties `defaultMethod` and `methods`. +Modify the `EndpointCustomizer` that strictly for `MethodKafkaListenerEndpoint` types. +The `EndpointHandlerMethod` add new constructors construct an instance for the provided bean. +Provides new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. + [[x32-seek-offset-compute-fn]] === New API method to seek to an offset based on a user provided function `ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index 577a90eb49..f06ea580ae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,15 +35,16 @@ * @param the value type. * * @author Gary Russell + * @author Wang Zhiyang * * @see org.springframework.kafka.annotation.KafkaHandler * @see DelegatingInvocableHandler */ public class MultiMethodKafkaListenerEndpoint extends MethodKafkaListenerEndpoint { - private final List methods; + private List methods; - private final Method defaultMethod; + private Method defaultMethod; private Validator validator; @@ -60,6 +61,43 @@ public MultiMethodKafkaListenerEndpoint(List methods, @Nullable Method d setBean(bean); } + + /** + * Get a method list. + * @return the method list. + * @since 3.2 + */ + public List getMethods() { + return this.methods; + } + + /** + * Set a method list. + * @param methods the methods. + * @since 3.2 + */ + public void setMethods(List methods) { + this.methods = methods; + } + + /** + * Get a default method. + * @return the default method. + * @since 3.2 + */ + public Method getDefaultMethod() { + return this.defaultMethod; + } + + /** + * Set a default method. + * @param defaultMethod the default method. + * @since 3.2 + */ + public void setDefaultMethod(Method defaultMethod) { + this.defaultMethod = defaultMethod; + } + /** * Set a payload validator. * @param validator the validator. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java index 72351d9f95..ae6dee6fc2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,21 +24,25 @@ * Customizes main, retry and DLT endpoints in the Retry Topic functionality * and returns the resulting topic names. * + * @param the listener endpoint type. + * * @author Tomaz Fernandes + * @author Wang Zhiyang + * * @since 2.7.2 * * @see EndpointCustomizerFactory * */ @FunctionalInterface -public interface EndpointCustomizer { +public interface EndpointCustomizer> { /** * Customize the endpoint and return the topic names generated for this endpoint. * @param listenerEndpoint The main, retry or DLT endpoint to be customized. * @return A collection containing the topic names generated for this endpoint. */ - Collection customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint listenerEndpoint); + Collection customizeEndpointAndCollectTopics(T listenerEndpoint); class TopicNamesHolder { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java index 962128efec..f6bc477a4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java @@ -23,7 +23,9 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.EndpointHandlerMultiMethod; import org.springframework.kafka.support.TopicPartitionOffset; /** @@ -63,41 +65,88 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory; } - public final EndpointCustomizer createEndpointCustomizer() { - return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory), - this.beanMethod.getMethod()); + public final EndpointCustomizer> createEndpointCustomizer() { + return addSuffixesAndMethod(this.destinationProperties); } - protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) { + /** + * Create MethodKafkaListenerEndpoint's EndpointCustomizer, but not support MultiMethodKafkaListenerEndpoint. + * Replace by {@link #addSuffixesAndMethod(DestinationTopic.Properties)} + * @param properties the destination-topic's properties. + * @param bean the bean. + * @param method the method. + * @return the endpoint customizer. + */ + @Deprecated(since = "3.2", forRemoval = true) + @SuppressWarnings("rawtypes") + protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, + Method method) { + + RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = + this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties); + return endpoint -> { + Collection topics = + customizeAndRegisterTopics(namesProvider, endpoint); + configurationEndpoint(endpoint, namesProvider, properties, bean); + endpoint.setMethod(method); + return topics; + }; + } + + /** + * Create MethodKafkaListenerEndpoint's EndpointCustomizer and support MultiMethodKafkaListenerEndpoint. + * @param properties the destination-topic's properties. + * @return the endpoint customizer. + * @since 3.2 + */ + protected EndpointCustomizer> addSuffixesAndMethod( + DestinationTopic.Properties properties) { + RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties); return endpoint -> { - Collection topics = customizeAndRegisterTopics(namesProvider, endpoint); - endpoint.setId(namesProvider.getEndpointId(endpoint)); - endpoint.setGroupId(namesProvider.getGroupId(endpoint)); - if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { - endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, - endpoint.getTopicPartitionsToAssign())); + Collection topics = + customizeAndRegisterTopics(namesProvider, endpoint); + configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory)); + if (endpoint instanceof MultiMethodKafkaListenerEndpoint multiMethodEndpoint + && this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) { + multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod()); + multiMethodEndpoint.setMethods(beanMultiMethod.getMethods()); } else { - endpoint.setTopics(endpoint.getTopics().stream() - .map(namesProvider::getTopicName).toArray(String[]::new)); - } - endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); - endpoint.setGroup(namesProvider.getGroup(endpoint)); - endpoint.setBean(bean); - endpoint.setMethod(method); - Boolean autoStartDltHandler = properties.autoStartDltHandler(); - if (autoStartDltHandler != null && properties.isDltTopic()) { - endpoint.setAutoStartup(autoStartDltHandler); + endpoint.setMethod(this.beanMethod.getMethod()); } return topics; }; } + private void configurationEndpoint(MethodKafkaListenerEndpoint endpoint, + RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, + DestinationTopic.Properties properties, Object bean) { + + endpoint.setId(namesProvider.getEndpointId(endpoint)); + endpoint.setGroupId(namesProvider.getGroupId(endpoint)); + if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { + endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, + endpoint.getTopicPartitionsToAssign())); + } + else { + endpoint.setTopics(endpoint.getTopics().stream() + .map(namesProvider::getTopicName).toArray(String[]::new)); + } + endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); + endpoint.setGroup(namesProvider.getGroup(endpoint)); + endpoint.setBean(bean); + Boolean autoStartDltHandler = properties.autoStartDltHandler(); + if (autoStartDltHandler != null && properties.isDltTopic()) { + endpoint.setAutoStartup(autoStartDltHandler); + } + } + private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties, RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset[] topicPartitionOffsets) { + return Stream.of(topicPartitionOffsets) .map(tpo -> properties.isMainEndpoint() ? getTPOForMainTopic(namesProvider, tpo) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index 4ad885d2ad..4e25fb3277 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -18,6 +18,7 @@ import java.lang.reflect.Method; import java.util.Collection; +import java.util.List; import java.util.function.Consumer; import org.apache.commons.logging.LogFactory; @@ -36,6 +37,7 @@ import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.EndpointHandlerMultiMethod; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicForRetryable; import org.springframework.lang.NonNull; @@ -150,6 +152,19 @@ * // ... message processing * } * + *

Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support + * {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as: + *

+ *     @RetryableTopic(attempts = 3,
+ *     		backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
+ *     @KafkaListener(topics = "my-annotated-topic")
+ *     static class ListenerBean {
+ *          @KafkaHandler
+ *         public void processMessage(MyPojo message) {
+ *        		// ... message processing
+ *         }
+ *     }
+ *
*

Or through meta-annotations, such as: *

  *     @RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
@@ -281,7 +296,7 @@ public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
 											KafkaListenerEndpointRegistrar registrar,
 											@Nullable KafkaListenerContainerFactory factory,
 											String defaultContainerFactoryBeanName) {
-		throwIfMultiMethodEndpoint(mainEndpoint);
+
 		String id = mainEndpoint.getId();
 		if (id == null) {
 			id = "no.id.provided";
@@ -300,6 +315,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint mainEndpoint,
 									RetryTopicConfiguration configuration,
 									DestinationTopicProcessor.Context context,
 									String defaultContainerFactoryBeanName) {
+
 		this.destinationTopicProcessor
 				.processDestinationTopicProperties(destinationTopicProperties ->
 						processAndRegisterEndpoint(mainEndpoint,
@@ -330,7 +346,13 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint mainEn
 			endpoint = mainEndpoint;
 		}
 		else {
-			endpoint = new MethodKafkaListenerEndpoint<>();
+			if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint multi) {
+				endpoint = new MultiMethodKafkaListenerEndpoint<>(multi.getMethods(), multi.getDefaultMethod(),
+						multi.getBean());
+			}
+			else {
+				endpoint = new MethodKafkaListenerEndpoint<>();
+			}
 			endpoint.setId(mainEndpoint.getId());
 			endpoint.setMainListenerId(mainEndpoint.getId());
 		}
@@ -345,12 +367,12 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint mainEn
 				getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
 
 		createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
-						.customizeEndpointAndCollectTopics(endpoint)
-						.forEach(topicNamesHolder ->
-								this.destinationTopicProcessor
-										.registerDestinationTopic(topicNamesHolder.getMainTopic(),
-												topicNamesHolder.getCustomizedTopic(),
-												destinationTopicProperties, context));
+				.customizeEndpointAndCollectTopics(endpoint)
+				.forEach(topicNamesHolder ->
+						this.destinationTopicProcessor
+								.registerDestinationTopic(topicNamesHolder.getMainTopic(),
+										topicNamesHolder.getCustomizedTopic(),
+										destinationTopicProperties, context));
 
 		registrar.registerEndpoint(endpoint, resolvedFactory);
 		endpoint.setBeanFactory(this.beanFactory);
@@ -359,9 +381,10 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint mainEn
 	protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint mainEndpoint,
 														RetryTopicConfiguration configuration,
 														DestinationTopic.Properties props) {
+
 		EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
-		EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
-		return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
+		return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(mainEndpoint, dltHandlerMethod)
+				: getRetryEndpointHandlerMethod(mainEndpoint);
 	}
 
 	private Consumer> getTopicCreationFunction(RetryTopicConfiguration config) {
@@ -383,7 +406,7 @@ protected void createNewTopicBeans(Collection topics, RetryTopicConfigur
 		);
 	}
 
-	protected EndpointCustomizer createEndpointCustomizer(
+	protected EndpointCustomizer> createEndpointCustomizer(
 			EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {
 
 		return new EndpointCustomizerFactory(destinationTopicProperties,
@@ -393,8 +416,28 @@ protected EndpointCustomizer createEndpointCustomizer(
 				.createEndpointCustomizer();
 	}
 
-	private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
-		return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
+	private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(MethodKafkaListenerEndpoint mainEndpoint,
+			@Nullable EndpointHandlerMethod dltEndpointHandlerMethod) {
+
+		EndpointHandlerMethod dltHandlerMethod = dltEndpointHandlerMethod != null
+				? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
+		if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
+			dltHandlerMethod = new EndpointHandlerMultiMethod(dltHandlerMethod.resolveBean(this.beanFactory),
+					dltHandlerMethod.getMethod(), List.of(dltHandlerMethod.getMethod()));
+		}
+		return dltHandlerMethod;
+	}
+
+	private EndpointHandlerMethod getRetryEndpointHandlerMethod(MethodKafkaListenerEndpoint mainEndpoint) {
+		EndpointHandlerMethod retryBeanMethod;
+		if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint multi) {
+			retryBeanMethod = new EndpointHandlerMultiMethod(multi.getBean(), multi.getDefaultMethod(),
+					multi.getMethods());
+		}
+		else {
+			retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
+		}
+		return retryBeanMethod;
 	}
 
 	private KafkaListenerContainerFactory resolveAndConfigureFactoryForMainEndpoint(
@@ -419,12 +462,6 @@ private KafkaListenerContainerFactory resolveAndConfigureFactoryForRetryEndpo
 		return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
 	}
 
-	private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint mainEndpoint) {
-		if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
-			throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
-		}
-	}
-
 	public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
 		return new EndpointHandlerMethod(beanOrClass, methodName);
 	}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java
index f1d1968b0d..fac721f50d 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java
@@ -41,7 +41,7 @@ public class EndpointHandlerMethod {
 
 	private final Object beanOrClass;
 
-	private final String methodName;
+	private String methodName;
 
 	private Object bean;
 
@@ -54,6 +54,17 @@ public EndpointHandlerMethod(Object beanOrClass, String methodName) {
 		this.methodName = methodName;
 	}
 
+	/**
+	 * Construct an instance for the provided bean.
+	 * @param bean the bean.
+	 * @since 3.2
+	 */
+	public EndpointHandlerMethod(Object bean) {
+		Assert.notNull(bean, () -> "No bean for destination provided!");
+		this.bean = bean;
+		this.beanOrClass = bean.getClass();
+	}
+
 	public EndpointHandlerMethod(Object bean, Method method) {
 		Assert.notNull(bean, () -> "No bean for destination provided!");
 		Assert.notNull(method, () -> "No method for destination bean class provided!");
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMultiMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMultiMethod.java
new file mode 100644
index 0000000000..34595ed18d
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMultiMethod.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.support;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Handler multi method for retrying endpoints.
+ *
+ * @author Wang Zhiyang
+ *
+ * @since 3.2
+ *
+ */
+public class EndpointHandlerMultiMethod extends EndpointHandlerMethod {
+
+	private Method defaultMethod;
+
+	private List methods;
+
+	/**
+	 * Construct an instance for the provided bean, defaultMethod and methods.
+	 * @param bean the bean.
+	 * @param defaultMethod the defaultMethod.
+	 * @param methods the methods.
+	 */
+	public EndpointHandlerMultiMethod(Object bean, Method defaultMethod, List methods) {
+		super(bean);
+		this.defaultMethod = defaultMethod;
+		this.methods = methods;
+	}
+
+	/**
+	 * Return the method list.
+	 * @return the method list.
+	 */
+	public List getMethods() {
+		return this.methods;
+	}
+
+	/**
+	 * Set the method list.
+	 * @param methods the method list.
+	 */
+	public void setMethods(List methods) {
+		this.methods = methods;
+	}
+
+	/**
+	 * Return the default method.
+	 * @return the default method.
+	 */
+	public Method getDefaultMethod() {
+		return this.defaultMethod;
+	}
+
+	/**
+	 * Set the default method.
+	 * @param defaultMethod the default method.
+	 */
+	public void setDefaultMethod(Method defaultMethod) {
+		this.defaultMethod = defaultMethod;
+	}
+
+}
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java
index 7a8c93039d..a7dbcb6793 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2022 the original author or authors.
+ * Copyright 2022-2024 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,24 +18,32 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
+import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
 import org.springframework.kafka.support.EndpointHandlerMethod;
+import org.springframework.kafka.support.EndpointHandlerMultiMethod;
 import org.springframework.kafka.support.TopicPartitionOffset;
 
 /**
  * @author Tomaz Fernandes
+ * @author Wang Zhiyang
+ *
  * @since 2.8.5
  */
 @ExtendWith(MockitoExtension.class)
@@ -44,8 +52,9 @@ class EndpointCustomizerFactoryTests {
 	@Mock
 	private DestinationTopic.Properties properties;
 
-	@Mock
-	private EndpointHandlerMethod beanMethod;
+	private static final EndpointHandlerMethod beanMethod = mock(EndpointHandlerMethod.class);
+
+	private static final EndpointHandlerMultiMethod beanMultiMethod = mock(EndpointHandlerMultiMethod.class);
 
 	@Mock
 	private BeanFactory beanFactory;
@@ -53,15 +62,24 @@ class EndpointCustomizerFactoryTests {
 	@Mock
 	private RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
 
-	@Mock
-	private MethodKafkaListenerEndpoint endpoint;
+	private static final MethodKafkaListenerEndpoint endpoint = mock(MethodKafkaListenerEndpoint.class);
 
-	private final String[] topics = {"myTopic1", "myTopic2"};
+	private static final MultiMethodKafkaListenerEndpoint multiEndpoint = mock(MultiMethodKafkaListenerEndpoint.class);
 
-	private final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0];
+	private static final String[] topics = {"myTopic1", "myTopic2"};
 
-	@Test
-	void shouldNotCustomizeEndpointForMainTopicWithTopics() {
+	private static final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0];
+
+	private static Stream paramsForEndpointCustomizerFactory() {
+		return Stream.of(
+				Arguments.of(beanMethod, endpoint),
+				Arguments.of(beanMultiMethod, multiEndpoint));
+	}
+
+	@ParameterizedTest(name = "{index} shouldNotCustomizeEndpointForMainTopicWithTopics beanMethod is {0}, endpoint is {1}")
+	@MethodSource("paramsForEndpointCustomizerFactory")
+	void shouldNotCustomizeEndpointForMainTopicWithTopics(EndpointHandlerMethod beanMethod,
+			MethodKafkaListenerEndpoint endpoint) {
 
 		given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
 		given(endpoint.getTopics()).willReturn(Arrays.asList(topics));
@@ -70,8 +88,8 @@ void shouldNotCustomizeEndpointForMainTopicWithTopics() {
 				new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
 		given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
 
-		EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
-				beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
+		EndpointCustomizer> endpointCustomizer = new EndpointCustomizerFactory(
+				properties, beanMethod, beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
 
 		List holders =
 				(List) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
@@ -83,8 +101,15 @@ void shouldNotCustomizeEndpointForMainTopicWithTopics() {
 
 	}
 
-	@Test
-	void shouldNotCustomizeEndpointForMainTopicWithTPO() {
+	private static Stream paramsCustomizeEndpointForMainTopic() {
+		return Stream.of(
+				Arguments.of(beanMethod, false),
+				Arguments.of(beanMultiMethod, true));
+	}
+
+	@ParameterizedTest(name = "{index} shouldNotCustomizeEndpointForMainTopicWithTPO beanMethod is {0}, is multi {1}")
+	@MethodSource("paramsCustomizeEndpointForMainTopic")
+	void shouldNotCustomizeEndpointForMainTopicWithTPO(EndpointHandlerMethod beanMethod, boolean isMulti) {
 
 		given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
 		given(properties.isMainEndpoint()).willReturn(true);
@@ -94,27 +119,20 @@ void shouldNotCustomizeEndpointForMainTopicWithTPO() {
 		given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
 
 		String testString = "testString";
-		MethodKafkaListenerEndpoint endpointTPO = new MethodKafkaListenerEndpoint<>();
+		MethodKafkaListenerEndpoint endpointTPO = getEndpoint(isMulti, testString);
 		endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
 				new TopicPartitionOffset(topics[1], 1, 1L));
-		endpointTPO.setMethod(this.method);
-		endpointTPO.setId(testString);
-		endpointTPO.setClientIdPrefix(testString);
-		endpointTPO.setGroup(testString);
 
-		EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
-				beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
+		EndpointCustomizer> endpointCustomizer = new EndpointCustomizerFactory(
+				properties, beanMethod, beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
 
 		List holders =
 				(List) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
 
-		assertThat(holders).hasSize(2).element(0)
-				.matches(assertMainTopic(0));
-		assertThat(holders).element(1)
-				.matches(assertMainTopic(1));
+		assertThat(holders).hasSize(2).element(0).matches(assertMainTopic(0));
+		assertThat(holders).element(1).matches(assertMainTopic(1));
 
-		assertThat(endpointTPO.getTopics())
-				.isEmpty();
+		assertThat(endpointTPO.getTopics()).isEmpty();
 
 		TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
 		assertThat(topicPartitionsToAssign).hasSize(2);
@@ -125,29 +143,13 @@ void shouldNotCustomizeEndpointForMainTopicWithTPO() {
 
 	}
 
-	private Predicate assertMainTopic(int index) {
-		return holder -> holder.getCustomizedTopic().equals(topics[index])
-				&& holder.getMainTopic().equals(topics[index]);
-	}
+	@ParameterizedTest(name = "{index} shouldCustomizeEndpointForRetryTopicWithTopic beanMethod is {0}, endpoint is {1}")
+	@MethodSource("paramsCustomizeEndpointForMainTopic")
+	void shouldCustomizeEndpointForRetryTopicWithTopic(EndpointHandlerMethod beanMethod, boolean isMulti) {
 
-	@Test
-	void shouldCustomizeEndpointForRetryTopic() {
-
-		MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
 		String testString = "testString";
-		endpoint.setTopics(this.topics);
-		endpoint.setMethod(this.method);
-		endpoint.setId(testString);
-		endpoint.setClientIdPrefix(testString);
-		endpoint.setGroup(testString);
-
-		MethodKafkaListenerEndpoint endpointTPO = new MethodKafkaListenerEndpoint<>();
-		endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
-				new TopicPartitionOffset(topics[1], 1, 1L));
-		endpointTPO.setMethod(this.method);
-		endpointTPO.setId(testString);
-		endpointTPO.setClientIdPrefix(testString);
-		endpointTPO.setGroup(testString);
+		MethodKafkaListenerEndpoint endpoint = getEndpoint(isMulti, testString);
+		endpoint.setTopics(topics);
 
 		String suffix = "-retry";
 		given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
@@ -159,8 +161,8 @@ void shouldCustomizeEndpointForRetryTopic() {
 				new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
 		given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
 
-		EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
-				beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
+		EndpointCustomizer> endpointCustomizer = new EndpointCustomizerFactory(
+				properties, beanMethod, beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
 
 		List holders =
 				(List) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
@@ -175,17 +177,16 @@ void shouldCustomizeEndpointForRetryTopic() {
 						&& holder.getCustomizedTopic().equals(topic2WithSuffix));
 
 		String testStringSuffix = testString + suffix;
-
-		assertThat(endpoint.getTopics())
-				.contains(topic1WithSuffix, topic2WithSuffix);
-		assertThat(endpoint.getId())
-				.isEqualTo(testStringSuffix);
-		assertThat(endpoint.getClientIdPrefix())
-				.isEqualTo(testStringSuffix);
-		assertThat(endpoint.getGroup())
-				.isEqualTo(testStringSuffix);
+		assertThat(endpoint.getTopics()).contains(topic1WithSuffix, topic2WithSuffix);
+		assertThat(endpoint.getId()).isEqualTo(testStringSuffix);
+		assertThat(endpoint.getClientIdPrefix()).isEqualTo(testStringSuffix);
+		assertThat(endpoint.getGroup()).isEqualTo(testStringSuffix);
 		assertThat(endpoint.getTopicPartitionsToAssign()).isEmpty();
 
+		MethodKafkaListenerEndpoint endpointTPO = getEndpoint(isMulti, testString);
+		endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
+				new TopicPartitionOffset(topics[1], 1, 1L));
+
 		List holdersTPO =
 				(List) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
 
@@ -196,9 +197,7 @@ void shouldCustomizeEndpointForRetryTopic() {
 				.matches(holder -> holder.getMainTopic().equals(topics[1])
 						&& holder.getCustomizedTopic().equals(topic2WithSuffix));
 
-		assertThat(endpointTPO.getTopics())
-				.isEmpty();
-
+		assertThat(endpointTPO.getTopics()).isEmpty();
 		TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
 		assertThat(topicPartitionsToAssign).hasSize(2);
 		assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
@@ -206,12 +205,29 @@ void shouldCustomizeEndpointForRetryTopic() {
 		assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
 				new TopicPartitionOffset(topic2WithSuffix, 1, (Long) null))).isTrue();
 
-		assertThat(endpointTPO.getId())
-				.isEqualTo(testStringSuffix);
-		assertThat(endpointTPO.getClientIdPrefix())
-				.isEqualTo(testStringSuffix);
-		assertThat(endpointTPO.getGroup())
-				.isEqualTo(testStringSuffix);
+		assertThat(endpointTPO.getId()).isEqualTo(testStringSuffix);
+		assertThat(endpointTPO.getClientIdPrefix()).isEqualTo(testStringSuffix);
+		assertThat(endpointTPO.getGroup()).isEqualTo(testStringSuffix);
+	}
+
+	private MethodKafkaListenerEndpoint getEndpoint(boolean isMulti, String testString) {
+		MethodKafkaListenerEndpoint methodEndpoint;
+		if (isMulti) {
+			methodEndpoint = new MultiMethodKafkaListenerEndpoint<>(List.of(method), method, null);
+		}
+		else {
+			methodEndpoint = new MethodKafkaListenerEndpoint<>();
+			methodEndpoint.setMethod(method);
+		}
+		methodEndpoint.setId(testString);
+		methodEndpoint.setClientIdPrefix(testString);
+		methodEndpoint.setGroup(testString);
+		return methodEndpoint;
+	}
+
+	private Predicate assertMainTopic(int index) {
+		return holder -> holder.getCustomizedTopic().equals(topics[index])
+				&& holder.getMainTopic().equals(topics[index]);
 	}
 
 	private boolean equalsTopicPartitionOffset(TopicPartitionOffset tpo1, TopicPartitionOffset tpo2) {
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java
index b4fdae85bf..82c98264ad 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java
@@ -17,7 +17,6 @@
 package org.springframework.kafka.retrytopic;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.BDDMockito.given;
@@ -32,11 +31,15 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
@@ -75,16 +78,12 @@ class RetryTopicConfigurerTests {
 	@Mock
 	private BeanFactory beanFactory;
 
-	private DefaultListableBeanFactory defaultListableBeanFactory = new DefaultListableBeanFactory();
+	private final DefaultListableBeanFactory defaultListableBeanFactory = new DefaultListableBeanFactory();
 
-	@Mock
-	private RetryTopicConfigurer.EndpointProcessor endpointProcessor;
-
-	@Mock
-	private MethodKafkaListenerEndpoint mainEndpoint;
+	private static final MethodKafkaListenerEndpoint mainEndpoint = mock(MethodKafkaListenerEndpoint.class);
 
-	@Mock
-	private MultiMethodKafkaListenerEndpoint multiMethodEndpoint;
+	private static final MultiMethodKafkaListenerEndpoint mainMultiEndpoint =
+			mock(MultiMethodKafkaListenerEndpoint.class);
 
 	@Mock
 	private RetryTopicConfiguration configuration;
@@ -167,22 +166,15 @@ private Method getMethod(String methodName)  {
 		}
 	}
 
-	@Test
-	void shouldThrowIfMultiMethodEndpoint() {
-
-		// setup
-		RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,
-				listenerContainerFactoryConfigurer, new SuffixingRetryTopicNamesProviderFactory());
-		configurer.setBeanFactory(beanFactory);
-
-		// when - then
-		assertThatIllegalArgumentException().isThrownBy(
-				() -> configurer.processMainAndRetryListeners(endpointProcessor, multiMethodEndpoint, configuration,
-						registrar, containerFactory, defaultFactoryBeanName));
+	private static Stream paramsRetryEndpoints() {
+		return Stream.of(
+				Arguments.of(mainEndpoint),
+				Arguments.of(mainMultiEndpoint));
 	}
 
-	@Test
-	void shouldConfigureRetryEndpoints() {
+	@ParameterizedTest(name = "{index} shouldNotCustomizeEndpointForMainTopicWithTPO beanMethod is {0}, is multi {1}")
+	@MethodSource("paramsRetryEndpoints")
+	void shouldConfigureRetryEndpoints(MethodKafkaListenerEndpoint mainEndpoint) {
 
 		// given
 
@@ -204,7 +196,15 @@ void shouldConfigureRetryEndpoints() {
 
 		given(configuration.getDestinationTopicProperties()).willReturn(destinationPropertiesList);
 		given(mainEndpoint.getBean()).willReturn(bean);
-		given(mainEndpoint.getMethod()).willReturn(endpointMethod);
+		if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint multiEndpoint) {
+			given(multiEndpoint.getDefaultMethod()).willReturn(endpointMethod);
+			given(multiEndpoint.getMethods()).willReturn(List.of(endpointMethod));
+		}
+		else {
+			given(mainEndpoint.getMethod()).willReturn(endpointMethod);
+		}
+		given(endpointHandlerMethod.resolveBean(any())).willReturn(bean);
+		given(endpointHandlerMethod.getMethod()).willReturn(noOpsDltMethod);
 		given(configuration.getDltHandlerMethod()).willReturn(endpointHandlerMethod);
 		given(configuration.forKafkaTopicAutoCreation()).willReturn(topicCreationConfig);
 		given(topicCreationConfig.shouldCreateTopics()).willReturn(true);
@@ -217,6 +217,7 @@ void shouldConfigureRetryEndpoints() {
 		given(firstRetryDestinationProperties.suffix()).willReturn(firstRetrySuffix);
 		given(secondRetryDestinationProperties.suffix()).willReturn(secondRetrySuffix);
 		given(dltDestinationProperties.suffix()).willReturn(dltSuffix);
+		given(dltDestinationProperties.isDltTopic()).willReturn(true);
 		given(mainDestinationProperties.isMainEndpoint()).willReturn(true);
 		given(mainEndpoint.getTopics()).willReturn(topics);