From a37a85cf359da0d982d78ec19bb4030c5fedd4d7 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Tue, 5 Dec 2023 00:38:53 +0100 Subject: [PATCH 1/9] GH-2800: create topics used for exception based DLT routing --- .../kafka/annotation/RetryableTopic.java | 8 +++ .../kafka/retrytopic/DestinationTopic.java | 13 ++-- .../DestinationTopicPropertiesFactory.java | 21 +++++- .../ExceptionBasedDestinationDlt.java | 6 ++ .../retrytopic/ExceptionBasedDltRouting.java | 5 ++ .../RetryTopicConfigurationBuilder.java | 13 +++- ...estinationTopicPropertiesFactoryTests.java | 65 +++++++++++++------ .../retrytopic/DestinationTopicTests.java | 8 +-- 8 files changed, 109 insertions(+), 30 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 31ed10f824..d937b202e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -22,12 +22,18 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.DltStrategy; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; + +// TODO +// 2. inject exception detection when sending to DLT (consider traversing causes) +// 3. route the message to the configured additional destination or to the default DLT + /** * * Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated @@ -171,6 +177,8 @@ */ String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX; + ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting; + /** * Whether the retry topics will be suffixed with the delay value for that topic or a * simple index. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index c6895be55a..711da8195d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -16,7 +16,9 @@ package org.springframework.kafka.retrytopic; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.kafka.core.KafkaOperations; @@ -137,9 +139,10 @@ public static class Properties { private final long timeout; + private final Set> usedForExceptions; + @Nullable private final Boolean autoStartDltHandler; - /** * Create an instance with the provided properties with the DLT container starting * automatically (if the container factory is so configured). @@ -160,7 +163,7 @@ public Properties(long delayMs, String suffix, Type type, BiPredicate shouldRetryOn, long timeout) { this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, - timeout, null); + timeout, null, Collections.emptySet()); } /** @@ -173,7 +176,7 @@ public Properties(long delayMs, String suffix, Type type, public Properties(Properties sourceProperties, String suffix, Type type) { this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions, sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn, - sourceProperties.timeout, null); + sourceProperties.timeout, null, Collections.emptySet()); } /** @@ -194,7 +197,8 @@ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, + Set> usedForExceptions) { this.delayMs = delayMs; this.suffix = suffix; @@ -206,6 +210,7 @@ public Properties(long delayMs, String suffix, Type type, this.shouldRetryOn = shouldRetryOn; this.timeout = timeout; this.autoStartDltHandler = autoStartDltHandler; + this.usedForExceptions = usedForExceptions; } public boolean isDltTopic() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 1119bf39e9..7b29c8c397 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -27,6 +27,10 @@ import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + /** * * Creates a list of {@link DestinationTopic.Properties} based on the @@ -65,6 +69,8 @@ public class DestinationTopicPropertiesFactory { private final long timeout; + private final Map>> exceptionBasedRouting; + @Nullable private Boolean autoStartDltHandler; @@ -88,13 +94,15 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff DltStrategy dltStrategy, TopicSuffixingStrategy topicSuffixingStrategy, SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, - long timeout) { + long timeout, + Map>> exceptionBasedRouting) { this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; this.numPartitions = numPartitions; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); + this.exceptionBasedRouting = exceptionBasedRouting; this.backOffValues = backOffValues; int backOffValuesSize = this.backOffValues.size(); this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); @@ -126,6 +134,7 @@ public List createProperties() { } if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) { list.add(createDltProperties()); + list.addAll(createCustomDltProperties()); } return Collections.unmodifiableList(list); } @@ -138,7 +147,15 @@ private DestinationTopic.Properties createMainTopicProperties() { private DestinationTopic.Properties createDltProperties() { return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, Collections.emptySet()); + } + + private List createCustomDltProperties() { + return exceptionBasedRouting.entrySet().stream() + .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(), + DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue())) + .toList(); } private DestinationTopic.Properties createRetryProperties(int backOffIndex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java new file mode 100644 index 0000000000..4ed6d46911 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java @@ -0,0 +1,6 @@ +package org.springframework.kafka.retrytopic; + +public @interface ExceptionBasedDestinationDlt { + String customSuffix(); + Class[] exceptions(); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java new file mode 100644 index 0000000000..e58cf71479 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -0,0 +1,5 @@ +package org.springframework.kafka.retrytopic; + +public @interface ExceptionBasedDltRouting { + ExceptionBasedDestinationDlt[] routing() default {}; +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 4e24b1deb7..36bb131c7e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -17,7 +17,10 @@ package org.springframework.kafka.retrytopic; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; @@ -66,6 +69,7 @@ public class RetryTopicConfigurationBuilder { private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(); + private ConcurrentKafkaListenerContainerFactory listenerContainerFactory; @Nullable @@ -74,6 +78,8 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; + private Map>> exceptionBasedDltRouting = new HashMap<>(); + private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; private long timeout = RetryTopicConstants.NOT_SET; @@ -522,6 +528,11 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { return this.classifierBuilder; } + public RetryTopicConfigurationBuilder exceptionBasedDltRouting(Map>> exceptionBasedDltRouting) { + this.exceptionBasedDltRouting = exceptionBasedDltRouting; + return this; + } + /* ---------------- Configure KafkaListenerContainerFactory -------------- */ /** * Configure the container factory to use. @@ -567,7 +578,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.dltStrategy, - this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.exceptionBasedDltRouting) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 82ea3fc5f6..41dcb8f4ca 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -26,14 +26,18 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + /** * @author Tomaz Fernandes @@ -89,10 +93,10 @@ void shouldCreateMainAndDltProperties() { List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 2).isTrue(); @@ -112,6 +116,10 @@ void shouldCreateMainAndDltProperties() { } private void assertDltTopic(DestinationTopic.Properties dltProperties) { + assertDltTopic(dltProperties, this.dltSuffix); + } + + private void assertDltTopic(DestinationTopic.Properties dltProperties, String dltSuffix) { assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); assertThat(dltProperties.isDltTopic()).isTrue(); assertThat(dltProperties.isRetryTopic()).isFalse(); @@ -134,10 +142,10 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, - multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET).createProperties(); + multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -185,16 +193,35 @@ void shouldNotCreateDltProperties() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 3).isTrue(); assertThat(propertiesList.get(2).isDltTopic()).isFalse(); } + @Test + void shouldCreateDltPropertiesForCustomExceptionBasedRouting() { + // when + List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); + + String desExcDltSuffix = "deserialization"; + List propertiesList = + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET, Map.of(desExcDltSuffix, Set.of(DeserializationException.class))).createProperties(); + + // then + assertThat(propertiesList.size()).isSameAs(3); + + assertDltTopic(propertiesList.get(1)); + assertDltTopic(propertiesList.get(2), desExcDltSuffix + "-" + this.dltSuffix); + } + @Test void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { @@ -206,10 +233,10 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -247,10 +274,10 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -294,10 +321,10 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, - multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) @@ -315,10 +342,10 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts) @@ -340,7 +367,7 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -368,7 +395,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -395,7 +422,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -418,7 +445,7 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 1b381df46f..4a1574fd10 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -74,7 +74,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet()); protected List allProps = Arrays .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); @@ -93,7 +93,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected List allProps2 = Arrays .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); @@ -124,7 +124,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps4 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected DestinationTopic.Properties mainTopicProps5 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -136,7 +136,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps5 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); // Holders From 0680fc6f94ea3f6459cf815df22d1b84e04287b1 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Fri, 8 Dec 2023 12:34:46 +0100 Subject: [PATCH 2/9] GH-2800: route messages to specialized dlt based on exception type --- .../DefaultDestinationTopicResolver.java | 45 ++++++++++++++----- .../kafka/retrytopic/DestinationTopic.java | 4 ++ .../retrytopic/DestinationTopicContainer.java | 4 +- ...DefaultDestinationTopicProcessorTests.java | 9 ++-- .../DefaultDestinationTopicResolverTests.java | 25 ++++++++--- .../retrytopic/DestinationTopicTests.java | 17 +++++-- 6 files changed, 77 insertions(+), 27 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 522256b436..30af0d3718 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -97,8 +97,8 @@ public DestinationTopic resolveDestinationTopic(String mainListenerId, String to : destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e)) && isNotFatalException(e) && !isPastTimout(originalTimestamp, destinationTopicHolder) - ? resolveRetryDestination(destinationTopicHolder) - : getDltOrNoOpsDestination(mainListenerId, topic); + ? resolveRetryDestination(mainListenerId, destinationTopicHolder, e) + : getDltOrNoOpsDestination(mainListenerId, topic, e); } private Boolean isNotFatalException(Exception e) { @@ -128,10 +128,20 @@ && isNotFatalException(e) } @SuppressWarnings("deprecation") - private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) - ? destinationTopicHolder.getSourceDestination() - : destinationTopicHolder.getNextDestination(); + private DestinationTopic resolveRetryDestination(String mainListenerId, DestinationTopicHolder destinationTopicHolder, Exception e) { + if (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) { + return destinationTopicHolder.getSourceDestination(); + } + + if (isAlreadyDltDestination(destinationTopicHolder)) { + return getDltOrNoOpsDestination(mainListenerId, destinationTopicHolder.getSourceDestination().getDestinationName(), e); + } + + return destinationTopicHolder.getNextDestination(); + } + + private static boolean isAlreadyDltDestination(DestinationTopicHolder destinationTopicHolder) { + return destinationTopicHolder.getNextDestination().isDltTopic(); } @Override @@ -144,18 +154,29 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String @Nullable @Override - public DestinationTopic getDltFor(String mainListenerId, String topicName) { - DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName); + public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) { + DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e); return destination.isNoOpsTopic() ? null : destination; } - private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic) { + private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) { DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic); - return destination.isDltTopic() || destination.isNoOpsTopic() - ? destination - : getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName()); + return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ? + destination : + getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e); + } + + private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) { + if (!destination.isDltTopic()) { + return false; + } + + boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream() + .anyMatch(excType -> excType.isInstance(e)); + boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty(); + return isDltIntendedForCurrentExc || isGenericPurposeDlt; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 711da8195d..880abc6e5a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -90,6 +90,10 @@ public boolean shouldRetryOn(Integer attempt, Throwable e) { return this.properties.shouldRetryOn.test(attempt, e); } + public Set> usedForExceptions() { + return Collections.unmodifiableSet(this.properties.usedForExceptions); + } + @Override public String toString() { return "DestinationTopic{" + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 25dc22705b..cc89f61888 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,6 @@ public interface DestinationTopicContainer { * @return The {@link DestinationTopic} instance corresponding to the DLT. */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName); + DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java index 8f2943d501..c741687824 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,11 +75,12 @@ void shouldRegisterTopicDestinations() { // then assertThat(context.destinationsByTopicMap.containsKey(FIRST_TOPIC)).isTrue(); List destinationTopicsForFirstTopic = context.destinationsByTopicMap.get(FIRST_TOPIC); - assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(4); + assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(5); assertThat(destinationTopicsForFirstTopic.get(0)).isEqualTo(mainDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(1)).isEqualTo(firstRetryDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(2)).isEqualTo(secondRetryDestinationTopic); - assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(dltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(4)).isEqualTo(dltDestinationTopic); assertThat(context.destinationsByTopicMap.containsKey(SECOND_TOPIC)).isTrue(); List destinationTopicsForSecondTopic = context.destinationsByTopicMap.get(SECOND_TOPIC); @@ -143,7 +144,7 @@ void shouldCreateDestinationMapWhenProcessDestinations() { .flatMap(list -> list.stream()) .collect(Collectors.toList()); - assertThat(destinationList.size()).isEqualTo(11); + assertThat(destinationList.size()).isEqualTo(12); assertThat(destinationList.contains(mainDestinationTopic)).isTrue(); assertThat(destinationList.contains(firstRetryDestinationTopic)).isTrue(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index ea1fa3334b..e938fc31fe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import org.springframework.kafka.listener.TimestampedException; import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes @@ -78,16 +79,16 @@ public void setup() { void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", firstRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", dltDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic2.getDestinationName(), 1, @@ -142,6 +143,18 @@ void shouldResolveDltDestinationForFatalDefaultException() { .isEqualTo(dltDestinationTopic); } + @Test + void shouldResolveDeserializationDltDestinationForDeserializationException() { + DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException()); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), + 1, exc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(), + 1, exc, originalTimestamp)).isEqualTo(dltDestinationTopic); + } + @Test void shouldResolveNoOpsForFatalDefaultExceptionInDlt() { assertThat(defaultDestinationTopicContainer @@ -207,7 +220,7 @@ void shouldGetNextDestinationTopic() { @Test void shouldGetDlt() { assertThat(defaultDestinationTopicContainer - .getDltFor("id", mainDestinationTopic.getDestinationName())) + .getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException())) .isEqualTo(dltDestinationTopic); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 4a1574fd10..7e900d9353 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.classify.BinaryExceptionClassifier; @@ -26,6 +27,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes @@ -72,12 +74,16 @@ public class DestinationTopicTests { new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + protected DestinationTopic.Properties deserializationDltTopicProps = + new DestinationTopic.Properties(0, "-deserialization" + dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Set.of(DeserializationException.class)); + protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet()); protected List allProps = Arrays - .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); + .asList(mainTopicProps, firstRetryProps, secondRetryProps, deserializationDltTopicProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -148,10 +154,12 @@ public class DestinationTopicTests { protected PropsHolder secondRetryDestinationHolder = new PropsHolder(FIRST_TOPIC, secondRetryProps); + protected PropsHolder deserializationDltDestinationHolder = new PropsHolder(FIRST_TOPIC, deserializationDltTopicProps); + protected PropsHolder dltDestinationHolder = new PropsHolder(FIRST_TOPIC, dltTopicProps); protected List allFirstDestinationsHolders = Arrays - .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, dltDestinationHolder); + .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, deserializationDltDestinationHolder, dltDestinationHolder); protected final static String SECOND_TOPIC = "secondTopic"; @@ -206,12 +214,15 @@ public class DestinationTopicTests { protected DestinationTopic dltDestinationTopic = new DestinationTopic(FIRST_TOPIC + dltTopicProps.suffix(), dltTopicProps); + protected DestinationTopic deserializationExcDltDestinationTopic = + new DestinationTopic(FIRST_TOPIC + "-deserialization" + dltTopicProps.suffix(), deserializationDltTopicProps); + protected DestinationTopic noOpsDestinationTopic = new DestinationTopic(dltDestinationTopic.getDestinationName() + "-noOps", new DestinationTopic.Properties(dltTopicProps, "-noOps", DestinationTopic.Type.NO_OPS)); protected List allFirstDestinationsTopics = Arrays - .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, dltDestinationTopic); + .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, deserializationExcDltDestinationTopic, dltDestinationTopic); protected DestinationTopic mainDestinationTopic2 = new DestinationTopic(SECOND_TOPIC + mainTopicProps2.suffix(), mainTopicProps2); From 9cad95cddabeb5763e451d92d17091b05914bcd3 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Sun, 10 Dec 2023 16:15:50 +0100 Subject: [PATCH 3/9] GH-2800: wire configuration from annotation and traverse causes Closes #2800 --- .../kafka/annotation/RetryableTopic.java | 15 ++--- .../RetryableTopicAnnotationProcessor.java | 12 +++- .../DefaultDestinationTopicResolver.java | 18 +++++- .../kafka/retrytopic/DestinationTopic.java | 8 ++- .../retrytopic/DestinationTopicContainer.java | 4 +- .../DestinationTopicPropertiesFactory.java | 18 +++--- .../ExceptionBasedDestinationDlt.java | 46 ++++++++++++++- .../retrytopic/ExceptionBasedDltRouting.java | 33 ++++++++++- .../RetryTopicConfigurationBuilder.java | 18 ++++-- ...DefaultDestinationTopicProcessorTests.java | 1 + .../DefaultDestinationTopicResolverTests.java | 6 +- ...estinationTopicPropertiesFactoryTests.java | 14 ++--- .../retrytopic/DestinationTopicTests.java | 1 + .../RetryTopicConfigurationBuilderTests.java | 22 +++++++- ...etryableTopicAnnotationProcessorTests.java | 56 +++++++++++++++++-- 15 files changed, 231 insertions(+), 41 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index d937b202e2..d78d1fe820 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -22,18 +22,13 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; - -// TODO -// 2. inject exception detection when sending to DLT (consider traversing causes) -// 3. route the message to the configured additional destination or to the default DLT - /** * * Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated @@ -45,6 +40,7 @@ * @author Gary Russell * @author Fabio da Silva Jr. * @author João Lima + * @author Adrian Chlebosz * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -177,7 +173,12 @@ */ String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX; - ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting; + /** + * The DLT routing allowing to redirect the message to the custom DLT based on the + * exception thrown during the processing. + * @return the exception based DLT routing + */ + ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting; /** * Whether the retry topics will be suffixed with the delay value for that topic or a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 5b716bf719..f19e8b7c6f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; @@ -34,6 +33,8 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; @@ -58,6 +59,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -148,6 +150,9 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) + .timeoutAfter(timeout) + .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())) + .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); .timeoutAfter(timeout); Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); @@ -207,6 +212,11 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N return policy; } + private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) { + return Arrays.stream(routingSpec.routingRules()) + .collect(Collectors.toMap(ExceptionBasedDestinationDlt::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); + } + private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) { Class declaringClass = listenerMethod.getDeclaringClass(); return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass)) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 30af0d3718..b155c48304 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -49,6 +49,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Yvette Quinby + * @author Adrian Chlebosz * @since 2.7 * */ @@ -174,11 +175,26 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio } boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream() - .anyMatch(excType -> excType.isInstance(e)); + .anyMatch(excType -> isDirectExcOrCause(e, excType)); boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty(); return isDltIntendedForCurrentExc || isGenericPurposeDlt; } + private static boolean isDirectExcOrCause(Exception e, Class excType) { + Throwable toMatch = e; + + boolean isMatched = excType.isInstance(toMatch); + while (!isMatched) { + toMatch = toMatch.getCause(); + if (toMatch == null) { + return false; + } + isMatched = excType.isInstance(toMatch); + } + + return isMatched; + } + @Override public DestinationTopic getNextDestinationTopicFor(String mainListenerId, String topic) { return getDestinationHolderFor(mainListenerId, topic).getNextDestination(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 880abc6e5a..9a2aee0bf6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -30,6 +30,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -195,6 +196,7 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. + * @param usedForExceptions the exceptions which destination is intended for * @since 2.8 */ public Properties(long delayMs, String suffix, Type type, @@ -202,7 +204,7 @@ public Properties(long delayMs, String suffix, Type type, DltStrategy dltStrategy, KafkaOperations kafkaOperations, BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, - Set> usedForExceptions) { + Set> usedForExceptions) { this.delayMs = delayMs; this.suffix = suffix; @@ -248,6 +250,10 @@ public Boolean autoStartDltHandler() { return this.autoStartDltHandler; } + public Set> usedForExceptions() { + return this.usedForExceptions; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index cc89f61888..61160fa237 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -26,6 +26,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ public interface DestinationTopicContainer { @@ -65,9 +66,10 @@ public interface DestinationTopicContainer { * DLT for the given topic, or null if none is found. * @param mainListenerId the listener id. * @param topicName the topic name for which to look the DLT for + * @param exc the exception which is being handled * @return The {@link DestinationTopic} instance corresponding to the DLT. */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e); + DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 7b29c8c397..458fa5136f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -19,6 +19,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.classify.BinaryExceptionClassifier; @@ -27,10 +29,6 @@ import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; -import java.util.Map; -import java.util.Set; -import java.util.stream.Stream; - /** * * Creates a list of {@link DestinationTopic.Properties} based on the @@ -40,6 +38,7 @@ * @author Gary Russell * @author João Lima * @author Wang Zhiyang + * @author Adrian Chlebosz * @since 2.7 * */ @@ -69,7 +68,7 @@ public class DestinationTopicPropertiesFactory { private final long timeout; - private final Map>> exceptionBasedRouting; + private final Map>> dltRoutingRules; @Nullable private Boolean autoStartDltHandler; @@ -86,6 +85,7 @@ public class DestinationTopicPropertiesFactory { * @param topicSuffixingStrategy the topic suffixing strategy. * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. * @param timeout the timeout. + * @param dltRoutingRules the specification of which DLT should be used for the particular exception type * @since 3.0.12 */ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, @@ -95,14 +95,14 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff TopicSuffixingStrategy topicSuffixingStrategy, SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, long timeout, - Map>> exceptionBasedRouting) { + Map>> dltRoutingRules) { this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; this.numPartitions = numPartitions; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); - this.exceptionBasedRouting = exceptionBasedRouting; + this.dltRoutingRules = dltRoutingRules; this.backOffValues = backOffValues; int backOffValuesSize = this.backOffValues.size(); this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); @@ -151,8 +151,8 @@ private DestinationTopic.Properties createDltProperties() { } private List createCustomDltProperties() { - return exceptionBasedRouting.entrySet().stream() - .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(), + return this.dltRoutingRules.entrySet().stream() + .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue())) .toList(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java index 4ed6d46911..061191a4d2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java @@ -1,6 +1,48 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.retrytopic; +/** + * Annotation allowing to specify additional DLT which will be chosen when message + * processing caused the configured exception to be thrown. + * + * @author Adrian Chlebosz + * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting + * @since 3.1.1 + */ public @interface ExceptionBasedDestinationDlt { - String customSuffix(); - Class[] exceptions(); + + /** + * Suffix extension used when constructing the name for the new DLT. It is placed + * before the main suffix configured through the + * ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the + * final name is the product of these two. + * + * @return the configured suffix extension + */ + String suffix(); + + /** + * When message processing throws one of the exceptions configured here, then + * it should be eventually redirected to the DLT with name containing the extension + * configured through {@link #suffix()}. The causes of the thrown exception will be + * traversed to match with any of configured ones. + * + * @return configured exceptions + */ + Class[] exceptions(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java index e58cf71479..f9b3ba9ace 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -1,5 +1,36 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.retrytopic; +/** + * Annotation allowing to specify the custom DLT routing rules steered by exceptions + * which might be thrown during the processing. + * + * @author Adrian Chlebosz + * @see org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt + * @since 3.1.1 + */ public @interface ExceptionBasedDltRouting { - ExceptionBasedDestinationDlt[] routing() default {}; + + /** + * Specific rules expressing to which custom DLT the message should be redirected + * when the specified exception has been thrown during its processing. + * + * @return configured routing + */ + ExceptionBasedDestinationDlt[] routingRules() default {}; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 36bb131c7e..4adb565f52 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -44,6 +44,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -78,7 +79,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; - private Map>> exceptionBasedDltRouting = new HashMap<>(); + private Map>> dltRoutingRules = new HashMap<>(); private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; @@ -528,8 +529,17 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { return this.classifierBuilder; } - public RetryTopicConfigurationBuilder exceptionBasedDltRouting(Map>> exceptionBasedDltRouting) { - this.exceptionBasedDltRouting = exceptionBasedDltRouting; + /** + * Configure to set DLT routing rules causing the message to be redirected to the custom + * DLT when the configured exception has been thrown during message processing. + * The cause of the originally thrown exception will be traversed in order to find the + * match with the configured exceptions. + * @param dltRoutingRules specification of custom DLT name extensions and exceptions which should be matched for them + * @return the builder + * @since 3.1.1 + */ + public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { + this.dltRoutingRules = dltRoutingRules; return this; } @@ -578,7 +588,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.dltStrategy, - this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.exceptionBasedDltRouting) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.dltRoutingRules) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java index c741687824..8efea11c1a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java @@ -35,6 +35,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index e938fc31fe..5fd891532f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -45,6 +45,7 @@ * @author Tomaz Fernandes * @author Yvette Quinby * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -146,13 +147,14 @@ void shouldResolveDltDestinationForFatalDefaultException() { @Test void shouldResolveDeserializationDltDestinationForDeserializationException() { DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException()); + TimestampedException timestampedExc = new TimestampedException(exc); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), - 1, exc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); + 1, timestampedExc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(), - 1, exc, originalTimestamp)).isEqualTo(dltDestinationTopic); + 1, timestampedExc, originalTimestamp)).isEqualTo(dltDestinationTopic); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 41dcb8f4ca..a93322460c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -18,7 +18,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; @@ -26,6 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; + import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; import org.springframework.kafka.core.KafkaOperations; @@ -34,14 +38,10 @@ import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - - /** * @author Tomaz Fernandes * @author Wang Zhiyang + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -218,8 +218,8 @@ void shouldCreateDltPropertiesForCustomExceptionBasedRouting() { // then assertThat(propertiesList.size()).isSameAs(3); - assertDltTopic(propertiesList.get(1)); - assertDltTopic(propertiesList.get(2), desExcDltSuffix + "-" + this.dltSuffix); + assertDltTopic(propertiesList.get(1), desExcDltSuffix + this.dltSuffix); + assertDltTopic(propertiesList.get(2)); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 7e900d9353..d90786900b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -31,6 +31,7 @@ /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ public class DestinationTopicTests { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java index 5da6ba70f6..990fe99e34 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,11 +30,13 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.util.ReflectionTestUtils; /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -187,4 +191,20 @@ void shouldSetNotAutoCreateTopics() { RetryTopicConfiguration.TopicCreation config = configuration.forKafkaTopicAutoCreation(); assertThat(config.shouldCreateTopics()).isFalse(); } + + @Test + void shouldSetDltRoutingRules() { + // setup + RetryTopicConfigurationBuilder builder = new RetryTopicConfigurationBuilder(); + + //when + RetryTopicConfiguration configuration = builder + .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class))) + .create(kafkaOperations); + + // then + DestinationTopic.Properties desExcDltProps = configuration.getDestinationTopicProperties().get(3); + assertThat(desExcDltProps.suffix()).isEqualTo("-deserialization-dlt"); + assertThat(desExcDltProps.usedForExceptions()).containsExactly(DeserializationException.class); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 5daa04fcd2..8508b7dcd5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.annotation.Backoff; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ReflectionUtils; @@ -50,6 +51,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @SuppressWarnings("deprecation") @@ -98,6 +100,15 @@ class RetryableTopicAnnotationProcessorTests { private final Object bean = createBean(); + // Retry with custom DLT routing + private final Method listenWithCustomDltRouting = ReflectionUtils + .findMethod(RetryableTopicAnnotationFactoryWithCustomDltRouting.class, listenerMethodName); + + private final RetryableTopic annotationWithCustomDltRouting = AnnotationUtils.findAnnotation( + listenWithCustomDltRouting, RetryableTopic.class); + + private final Object beanWithCustomDltRouting = createBean(); + private Object createBean() { try { return RetryableTopicAnnotationFactory.class.getDeclaredConstructor().newInstance(); @@ -107,17 +118,18 @@ private Object createBean() { } } + @Test void shouldGetDltHandlerMethod() { // setup given(beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) - .willReturn(kafkaOperationsFromDefaultName); + .willReturn(kafkaOperationsFromDefaultName); RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); // then EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod(); @@ -125,7 +137,7 @@ void shouldGetDltHandlerMethod() { assertThat(method.getName()).isEqualTo("handleDlt"); assertThat(new DestinationTopic("", - configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); + configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); } @Test @@ -313,6 +325,27 @@ void shouldCreateFixedBackoff() { } + @Test + void shouldCreateExceptionBasedDltRoutingSpec() { + // setup + given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) + .willReturn(kafkaOperationsFromDefaultName); + RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); + + // given + RetryTopicConfiguration configuration = processor + .processAnnotation( + topics, listenWithCustomDltRouting, annotationWithCustomDltRouting, beanWithCustomDltRouting); + + // then + List destinationTopicProperties = configuration.getDestinationTopicProperties(); + + assertThat(destinationTopicProperties).hasSize(3); + assertThat(destinationTopicProperties.get(0).suffix()).isEmpty(); + assertThat(destinationTopicProperties.get(1).suffix()).isEqualTo("-deserialization-dlt"); + assertThat(destinationTopicProperties.get(2).suffix()).isEqualTo("-dlt"); + } + static class RetryableTopicAnnotationFactory { @KafkaListener @@ -336,4 +369,19 @@ void handleDlt() { // NoOps } } + + static class RetryableTopicAnnotationFactoryWithCustomDltRouting { + @KafkaListener + @RetryableTopic( + attempts = "1", + exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = { + @ExceptionBasedDestinationDlt( + suffix = "-deserialization", exceptions = {DeserializationException.class} + ) + }) + ) + void listenWithRetry() { + // NoOps + } + } } From bba2ebbefe3959878aab5d87165f9cec1246ff3e Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Wed, 13 Dec 2023 20:58:05 +0100 Subject: [PATCH 4/9] GH-2800: prepare custom DLT routing changes to be released in version 3.2.0 Closes #2800 --- .../kafka/retrytopic/ExceptionBasedDestinationDlt.java | 2 +- .../kafka/retrytopic/ExceptionBasedDltRouting.java | 2 +- .../kafka/retrytopic/RetryTopicConfigurationBuilder.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java index 061191a4d2..2434438c32 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java @@ -22,7 +22,7 @@ * * @author Adrian Chlebosz * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting - * @since 3.1.1 + * @since 3.2.0 */ public @interface ExceptionBasedDestinationDlt { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java index f9b3ba9ace..07d6a27894 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -22,7 +22,7 @@ * * @author Adrian Chlebosz * @see org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt - * @since 3.1.1 + * @since 3.2.0 */ public @interface ExceptionBasedDltRouting { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 4adb565f52..0dae0da378 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -536,7 +536,7 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { * match with the configured exceptions. * @param dltRoutingRules specification of custom DLT name extensions and exceptions which should be matched for them * @return the builder - * @since 3.1.1 + * @since 3.2.0 */ public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { this.dltRoutingRules = dltRoutingRules; From 1a0a654f9ecc45d4c97c6f8b3738298f83cd58f9 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Sat, 16 Dec 2023 00:17:07 +0100 Subject: [PATCH 5/9] GH-2800: rename ExceptionBasedDestinationDlt to ExceptionBasedDltDestination --- .../kafka/annotation/RetryableTopicAnnotationProcessor.java | 4 ++-- ...dDestinationDlt.java => ExceptionBasedDltDestination.java} | 2 +- .../kafka/retrytopic/ExceptionBasedDltRouting.java | 4 ++-- .../retrytopic/RetryableTopicAnnotationProcessorTests.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) rename spring-kafka/src/main/java/org/springframework/kafka/retrytopic/{ExceptionBasedDestinationDlt.java => ExceptionBasedDltDestination.java} (97%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index f19e8b7c6f..08f60e8ee6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -33,7 +33,7 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt; +import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; @@ -214,7 +214,7 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) { return Arrays.stream(routingSpec.routingRules()) - .collect(Collectors.toMap(ExceptionBasedDestinationDlt::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); + .collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); } private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java similarity index 97% rename from spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java rename to spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java index 2434438c32..8f2cc6d22b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java @@ -24,7 +24,7 @@ * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting * @since 3.2.0 */ -public @interface ExceptionBasedDestinationDlt { +public @interface ExceptionBasedDltDestination { /** * Suffix extension used when constructing the name for the new DLT. It is placed diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java index 07d6a27894..e4284cc0fa 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -21,7 +21,7 @@ * which might be thrown during the processing. * * @author Adrian Chlebosz - * @see org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt + * @see ExceptionBasedDltDestination * @since 3.2.0 */ public @interface ExceptionBasedDltRouting { @@ -32,5 +32,5 @@ * * @return configured routing */ - ExceptionBasedDestinationDlt[] routingRules() default {}; + ExceptionBasedDltDestination[] routingRules() default {}; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 8508b7dcd5..22a8c7b5e9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -375,7 +375,7 @@ static class RetryableTopicAnnotationFactoryWithCustomDltRouting { @RetryableTopic( attempts = "1", exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = { - @ExceptionBasedDestinationDlt( + @ExceptionBasedDltDestination( suffix = "-deserialization", exceptions = {DeserializationException.class} ) }) From 420ec9f71687bf69cb6c4893a41da009905aaaa3 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Fri, 19 Jan 2024 00:30:28 +0100 Subject: [PATCH 6/9] GH-2800: rebase changes on top of the most recent main --- .../annotation/RetryableTopicAnnotationProcessor.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 08f60e8ee6..f292c74745 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; @@ -152,7 +156,6 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) .timeoutAfter(timeout) .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())) - .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); .timeoutAfter(timeout); Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); From bd761495f6c54b1803d8da404b708cc149e34b7a Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Sat, 20 Jan 2024 01:49:58 +0100 Subject: [PATCH 7/9] GH-2800: amendments after the code review --- .../kafka/annotation/RetryableTopic.java | 6 ++-- .../RetryableTopicAnnotationProcessor.java | 5 ++- .../DefaultDestinationTopicResolver.java | 11 +++++- .../kafka/retrytopic/DestinationTopic.java | 27 ++++++++++++-- .../retrytopic/DestinationTopicContainer.java | 20 +++++++++-- .../DestinationTopicPropertiesFactory.java | 28 ++++++++++++++- .../ExceptionBasedDltDestination.java | 6 ++-- .../retrytopic/ExceptionBasedDltRouting.java | 36 ------------------- .../RetryTopicConfigurationBuilder.java | 6 ++-- .../DefaultDestinationTopicResolverTests.java | 18 ++++++++-- ...etryableTopicAnnotationProcessorTests.java | 6 ++-- 11 files changed, 108 insertions(+), 61 deletions(-) delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index d78d1fe820..90abd23e28 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import java.lang.annotation.Target; import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; +import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; @@ -178,7 +178,7 @@ * exception thrown during the processing. * @return the exception based DLT routing */ - ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting; + ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {}; /** * Whether the retry topics will be suffixed with the delay value for that topic or a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index f292c74745..ffd69acd5b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -38,7 +38,6 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; -import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; @@ -215,8 +214,8 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N return policy; } - private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) { - return Arrays.stream(routingSpec.routingRules()) + private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) { + return Arrays.stream(routingRules) .collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index b155c48304..bd137714be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,6 +153,11 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String () -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination(); } + @Override + public DestinationTopic getDltFor(String mainListenerId, String topicName) { + return getDltFor(mainListenerId, topicName, null); + } + @Nullable @Override public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) { @@ -181,6 +186,10 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio } private static boolean isDirectExcOrCause(Exception e, Class excType) { + if (e == null) { + return false; + } + Throwable toMatch = e; boolean isMatched = excType.isInstance(toMatch); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 9a2aee0bf6..936a1c4dbf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,9 +196,32 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. - * @param usedForExceptions the exceptions which destination is intended for * @since 2.8 */ + public Properties(long delayMs, String suffix, Type type, + int maxAttempts, int numPartitions, + DltStrategy dltStrategy, + KafkaOperations kafkaOperations, + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, + timeout, autoStartDltHandler, Collections.emptySet()); + } + + /** + * Create an instance with the provided properties. + * @param delayMs the delay in ms. + * @param suffix the suffix. + * @param type the type. + * @param maxAttempts the max attempts. + * @param numPartitions the number of partitions. + * @param dltStrategy the DLT strategy. + * @param kafkaOperations the {@link KafkaOperations}. + * @param shouldRetryOn the exception classifications. + * @param timeout the timeout. + * @param autoStartDltHandler whether or not to start the DLT handler. + * @param usedForExceptions the exceptions which destination is intended for + * @since 3.2 + */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 61160fa237..45c9e566f7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,10 +66,24 @@ public interface DestinationTopicContainer { * DLT for the given topic, or null if none is found. * @param mainListenerId the listener id. * @param topicName the topic name for which to look the DLT for - * @param exc the exception which is being handled * @return The {@link DestinationTopic} instance corresponding to the DLT. + * @deprecated Replaced by {@link #getDltFor(String, String, Exception)} */ @Nullable - DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc); + @Deprecated(since = "3.2", forRemoval = true) + DestinationTopic getDltFor(String mainListenerId, String topicName); + /** + * Returns the {@link DestinationTopic} instance registered as + * DLT for the given topic taking into consideration the exception + * thrown, or null if none is found. + * @param mainListenerId the listener id. + * @param topicName the topic name for which to look the DLT for + * @param exc the exception which is being handled + * @return The {@link DestinationTopic} instance corresponding to the DLT. + */ + @Nullable + default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) { + return null; + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 458fa5136f..f6a20a4b8f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -85,9 +85,35 @@ public class DestinationTopicPropertiesFactory { * @param topicSuffixingStrategy the topic suffixing strategy. * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. * @param timeout the timeout. - * @param dltRoutingRules the specification of which DLT should be used for the particular exception type * @since 3.0.12 */ + public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, + BinaryExceptionClassifier exceptionClassifier, + int numPartitions, KafkaOperations kafkaOperations, + DltStrategy dltStrategy, + TopicSuffixingStrategy topicSuffixingStrategy, + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, + long timeout) { + + this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations, + dltStrategy, topicSuffixingStrategy, sameIntervalTopicReuseStrategy, timeout, Collections.emptyMap()); + } + + /** + * Construct an instance with the provided properties. + * @param retryTopicSuffix the suffix. + * @param dltSuffix the dlt suffix. + * @param backOffValues the back off values. + * @param exceptionClassifier the exception classifier. + * @param numPartitions the number of partitions. + * @param kafkaOperations the operations. + * @param dltStrategy the dlt strategy. + * @param topicSuffixingStrategy the topic suffixing strategy. + * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. + * @param timeout the timeout. + * @param dltRoutingRules the specification of which DLT should be used for the particular exception type + * @since 3.2.0 + */ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, BinaryExceptionClassifier exceptionClassifier, int numPartitions, KafkaOperations kafkaOperations, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java index 8f2cc6d22b..06443a93f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ * processing caused the configured exception to be thrown. * * @author Adrian Chlebosz - * @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting + * * @since 3.2.0 */ public @interface ExceptionBasedDltDestination { @@ -31,7 +31,6 @@ * before the main suffix configured through the * ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the * final name is the product of these two. - * * @return the configured suffix extension */ String suffix(); @@ -41,7 +40,6 @@ * it should be eventually redirected to the DLT with name containing the extension * configured through {@link #suffix()}. The causes of the thrown exception will be * traversed to match with any of configured ones. - * * @return configured exceptions */ Class[] exceptions(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java deleted file mode 100644 index e4284cc0fa..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.retrytopic; - -/** - * Annotation allowing to specify the custom DLT routing rules steered by exceptions - * which might be thrown during the processing. - * - * @author Adrian Chlebosz - * @see ExceptionBasedDltDestination - * @since 3.2.0 - */ -public @interface ExceptionBasedDltRouting { - - /** - * Specific rules expressing to which custom DLT the message should be redirected - * when the specified exception has been thrown during its processing. - * - * @return configured routing - */ - ExceptionBasedDltDestination[] routingRules() default {}; -} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 0dae0da378..c178dfb155 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; - private Map>> dltRoutingRules = new HashMap<>(); + private final Map>> dltRoutingRules = new HashMap<>(); private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; @@ -539,7 +539,7 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { * @since 3.2.0 */ public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { - this.dltRoutingRules = dltRoutingRules; + this.dltRoutingRules.putAll(dltRoutingRules); return this; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index 5fd891532f..67ebc3dada 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -220,12 +220,26 @@ void shouldGetNextDestinationTopic() { } @Test - void shouldGetDlt() { + void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName())) + .isEqualTo(dltDestinationTopic); + } + + @Test + void shouldGetGeneralPurposeDltWhenThereIsNoCustomDltRegisteredForExceptionType() { assertThat(defaultDestinationTopicContainer .getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException())) .isEqualTo(dltDestinationTopic); } + @Test + void shouldGetCustomDltWhenThereIsCustomDltRegisteredForExceptionType() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName(), new DeserializationException(null, null, false, null))) + .isEqualTo(deserializationExcDltDestinationTopic); + } + @Test void shouldThrowIfNoDestinationFound() { assertThatNullPointerException().isThrownBy( diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 22a8c7b5e9..2c887cf8d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -374,11 +374,11 @@ static class RetryableTopicAnnotationFactoryWithCustomDltRouting { @KafkaListener @RetryableTopic( attempts = "1", - exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = { + exceptionBasedDltRouting = { @ExceptionBasedDltDestination( suffix = "-deserialization", exceptions = {DeserializationException.class} ) - }) + } ) void listenWithRetry() { // NoOps From e46d4818c97134e7842b673a2c37561ecb657f09 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Sat, 20 Jan 2024 22:52:20 +0100 Subject: [PATCH 8/9] GH-2800: provide doc for the custom DLT routing feature Fixes #2800 --- .../ROOT/pages/retrytopic/features.adoc | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc index 963efa4dcc..3b4ac5bfc9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc @@ -261,3 +261,41 @@ protected Consumer It is recommended that you use the provided resolvers when constructing the custom instance. +[[exc-based-custom-dlt-routing]] +== Routing of messages to custom DLTs based on thrown exceptions + +Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing. +In order to do that, you need to specify the routing. +Routing customization consists of the specification of the additional destinations. +Destinations in turn consist of two settings: the `suffix` and `exceptions`. +When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered. +Take a look at the examples: + +[source, java] +---- +@RetryableTopic(exceptionBasedDltRouting = { + @ExceptionBasedDltDestination( + suffix = "-deserialization", exceptions = {DeserializationException.class} + )} +) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- + +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class))) + .create(kafkaOperations) + .create(template); +} +---- + +`suffix` takes place before the general `dltTopicSuffix` in the custom DLT name. +Considering presented examples, the message, which caused the `DeserializationException` will be routed to the `my-annotated-topic-deserialization-dlt` instead of the `my-annotated-topic-dlt`. +Custom DLTs will be created following the same rules as stated in the xref:retrytopic/features.adoc#topics-autocreation[Topics AutoCreation]. \ No newline at end of file From 18220c5b8587bdcbea9e95f3716c75d64bf58f32 Mon Sep 17 00:00:00 2001 From: Arian Chlebosz Date: Fri, 26 Jan 2024 22:16:42 +0100 Subject: [PATCH 9/9] GH-2800: code review amendments --- .../modules/ROOT/pages/retrytopic/features.adoc | 4 ++-- .../main/antora/modules/ROOT/pages/whats-new.adoc | 10 +++++++++- .../kafka/annotation/RetryableTopic.java | 1 + .../RetryableTopicAnnotationProcessor.java | 3 +-- .../DestinationTopicPropertiesFactory.java | 12 ++++++++++-- 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc index 3b4ac5bfc9..30a301ebfa 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc @@ -265,11 +265,11 @@ It is recommended that you use the provided resolvers when constructing the cust == Routing of messages to custom DLTs based on thrown exceptions Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing. -In order to do that, you need to specify the routing. +In order to do that, there's a need to specify the routing. Routing customization consists of the specification of the additional destinations. Destinations in turn consist of two settings: the `suffix` and `exceptions`. When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered. -Take a look at the examples: +Examples of configuration using either annotations or `RetryTopicConfiguration` beans: [source, java] ---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 77198a4bc9..d427e8f42e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -18,4 +18,12 @@ See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionId === Async @KafkaListener Return `@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. -See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. \ No newline at end of file +See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. + +[[x32-customizable-dlt-routing]] +=== Routing of messages to custom DLTs based on thrown exceptions + +It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing. +Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`. +Custom DLTs are created automatically as well as other retry and dead-letter topics. +See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 90abd23e28..be371ece64 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -177,6 +177,7 @@ * The DLT routing allowing to redirect the message to the custom DLT based on the * exception thrown during the processing. * @return the exception based DLT routing + * @since 3.2.0 */ ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {}; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index ffd69acd5b..b19ed2962c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -154,8 +154,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) .timeoutAfter(timeout) - .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())) - .timeoutAfter(timeout); + .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())); Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); if (attempts != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index f6a20a4b8f..d33ea3fa66 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -128,7 +130,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.numPartitions = numPartitions; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); - this.dltRoutingRules = dltRoutingRules; + this.dltRoutingRules = copyDltRoutingRules(dltRoutingRules); this.backOffValues = backOffValues; int backOffValuesSize = this.backOffValues.size(); this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); @@ -141,6 +143,12 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts(); } + private Map>> copyDltRoutingRules(Map>> dltRoutingRules) { + Map>> copyOfDltRoutingRules = new HashMap<>(); + dltRoutingRules.forEach((topicSuffix, exceptions) -> copyOfDltRoutingRules.put(topicSuffix, new HashSet<>(exceptions))); + return copyOfDltRoutingRules; + } + /** * Set to false to not start the DLT handler. * @param autoStart false to not start. @@ -159,8 +167,8 @@ public List createProperties() { list.add(createRetryProperties(backOffIndex)); } if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) { - list.add(createDltProperties()); list.addAll(createCustomDltProperties()); + list.add(createDltProperties()); } return Collections.unmodifiableList(list); }