diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index d78d1fe820..90abd23e28 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import java.lang.annotation.Target; import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; +import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; @@ -178,7 +178,7 @@ * exception thrown during the processing. * @return the exception based DLT routing */ - ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting; + ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {}; /** * Whether the retry topics will be suffixed with the delay value for that topic or a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index f292c74745..ffd69acd5b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -38,7 +38,6 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; @@ -215,8 +214,8 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N return policy; } - private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) { - return Arrays.stream(routingSpec.routingRules()) + private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) { + return Arrays.stream(routingRules) .collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index b155c48304..bd137714be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,6 +153,11 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String () -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination(); } + @Override + public DestinationTopic getDltFor(String mainListenerId, String topicName) { + return getDltFor(mainListenerId, topicName, null); + } + @Nullable @Override public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) { @@ -181,6 +186,10 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio } private static boolean isDirectExcOrCause(Exception e, Class excType) { + if (e == null) { + return false; + } + Throwable toMatch = e; boolean isMatched = excType.isInstance(toMatch); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 9a2aee0bf6..936a1c4dbf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,9 +196,32 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. - * @param usedForExceptions the exceptions which destination is intended for * @since 2.8 */ + public Properties(long delayMs, String suffix, Type type, + int maxAttempts, int numPartitions, + DltStrategy dltStrategy, + KafkaOperations kafkaOperations, + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, + timeout, autoStartDltHandler, Collections.emptySet()); + } + + /** + * Create an instance with the provided properties. + * @param delayMs the delay in ms. + * @param suffix the suffix. + * @param type the type. + * @param maxAttempts the max attempts. + * @param numPartitions the number of partitions. + * @param dltStrategy the DLT strategy. + * @param kafkaOperations the {@link KafkaOperations}. + * @param shouldRetryOn the exception classifications. + * @param timeout the timeout. + * @param autoStartDltHandler whether or not to start the DLT handler. + * @param usedForExceptions the exceptions which destination is intended for + * @since 3.2 + */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 61160fa237..45c9e566f7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,10 +66,24 @@ public interface DestinationTopicContainer { * DLT for the given topic, or null if none is found. * @param mainListenerId the listener id. * @param topicName the topic name for which to look the DLT for - * @param exc the exception which is being handled * @return The {@link DestinationTopic} instance corresponding to the DLT. + * @deprecated Replaced by {@link #getDltFor(String, String, Exception)} */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc); + @Deprecated(since = "3.2", forRemoval = true) + DestinationTopic getDltFor(String mainListenerId, String topicName); + /** + * Returns the {@link DestinationTopic} instance registered as + * DLT for the given topic taking into consideration the exception + * thrown, or null if none is found. + * @param mainListenerId the listener id. + * @param topicName the topic name for which to look the DLT for + * @param exc the exception which is being handled + * @return The {@link DestinationTopic} instance corresponding to the DLT. + */ + @Nullable + default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) { + return null; + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 58cd6786cb..2ad871c411 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,9 +85,35 @@ public class DestinationTopicPropertiesFactory { * @param topicSuffixingStrategy the topic suffixing strategy. * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. * @param timeout the timeout. - * @param dltRoutingRules the specification of which DLT should be used for the particular exception type * @since 3.0.12 */ + public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, + BinaryExceptionClassifier exceptionClassifier, + int numPartitions, KafkaOperations kafkaOperations, + DltStrategy dltStrategy, + TopicSuffixingStrategy topicSuffixingStrategy, + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, + long timeout) { + + this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations, + dltStrategy, topicSuffixingStrategy, sameIntervalTopicReuseStrategy, timeout, Collections.emptyMap()); + } + + /** + * Construct an instance with the provided properties. + * @param retryTopicSuffix the suffix. + * @param dltSuffix the dlt suffix. + * @param backOffValues the back off values. + * @param exceptionClassifier the exception classifier. + * @param numPartitions the number of partitions. + * @param kafkaOperations the operations. + * @param dltStrategy the dlt strategy. + * @param topicSuffixingStrategy the topic suffixing strategy. + * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. + * @param timeout the timeout. + * @param dltRoutingRules the specification of which DLT should be used for the particular exception type + * @since 3.2.0 + */ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, BinaryExceptionClassifier exceptionClassifier, int numPartitions, KafkaOperations kafkaOperations, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java index 8f2cc6d22b..06443a93f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ * processing caused the configured exception to be thrown. * * @author Adrian Chlebosz - * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting + * * @since 3.2.0 */ public @interface ExceptionBasedDltDestination { @@ -31,7 +31,6 @@ * before the main suffix configured through the * ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the * final name is the product of these two. - * * @return the configured suffix extension */ String suffix(); @@ -41,7 +40,6 @@ * it should be eventually redirected to the DLT with name containing the extension * configured through {@link #suffix()}. The causes of the thrown exception will be * traversed to match with any of configured ones. - * * @return configured exceptions */ Class[] exceptions(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java deleted file mode 100644 index e4284cc0fa..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.retrytopic; - -/** - * Annotation allowing to specify the custom DLT routing rules steered by exceptions - * which might be thrown during the processing. - * - * @author Adrian Chlebosz - * @see ExceptionBasedDltDestination - * @since 3.2.0 - */ -public @interface ExceptionBasedDltRouting { - - /** - * Specific rules expressing to which custom DLT the message should be redirected - * when the specified exception has been thrown during its processing. - * - * @return configured routing - */ - ExceptionBasedDltDestination[] routingRules() default {}; -} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 0dae0da378..c178dfb155 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; - private Map>> dltRoutingRules = new HashMap<>(); + private final Map>> dltRoutingRules = new HashMap<>(); private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; @@ -539,7 +539,7 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { * @since 3.2.0 */ public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { - this.dltRoutingRules = dltRoutingRules; + this.dltRoutingRules.putAll(dltRoutingRules); return this; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index 5fd891532f..67ebc3dada 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -220,12 +220,26 @@ void shouldGetNextDestinationTopic() { } @Test - void shouldGetDlt() { + void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName())) + .isEqualTo(dltDestinationTopic); + } + + @Test + void shouldGetGeneralPurposeDltWhenThereIsNoCustomDltRegisteredForExceptionType() { assertThat(defaultDestinationTopicContainer .getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException())) .isEqualTo(dltDestinationTopic); } + @Test + void shouldGetCustomDltWhenThereIsCustomDltRegisteredForExceptionType() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName(), new DeserializationException(null, null, false, null))) + .isEqualTo(deserializationExcDltDestinationTopic); + } + @Test void shouldThrowIfNoDestinationFound() { assertThatNullPointerException().isThrownBy( diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 22a8c7b5e9..2c887cf8d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -374,11 +374,11 @@ static class RetryableTopicAnnotationFactoryWithCustomDltRouting { @KafkaListener @RetryableTopic( attempts = "1", - exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = { + exceptionBasedDltRouting = { @ExceptionBasedDltDestination( suffix = "-deserialization", exceptions = {DeserializationException.class} ) - }) + } ) void listenWithRetry() { // NoOps