From c6d1d768d7bb5851b36e274c34c1dc70be9dba04 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz <36669019+breader124@users.noreply.github.com> Date: Fri, 2 Feb 2024 00:30:02 +0100 Subject: [PATCH] GH-2800: Exceptions based custom DLT routing Fixes: #2800 For non-blocking retries, provide the ability to route to custom DLTs based on exceptions thrown. * create topics used for exception-based DLT routing * route messages to specialized DLT based on the exception type * wire configuration from annotation and traverse causes * prepare custom DLT routing changes to be released in version 3.2.0 * rename ExceptionBasedDestinationDlt to ExceptionBasedDltDestination * rebase changes on top of the most recent main * amendments after the code review * provide doc for the custom DLT routing feature * code review amendments --- .../ROOT/pages/retrytopic/features.adoc | 38 ++++++++++ .../antora/modules/ROOT/pages/whats-new.adoc | 10 ++- .../kafka/annotation/RetryableTopic.java | 12 +++- .../RetryableTopicAnnotationProcessor.java | 15 +++- .../DefaultDestinationTopicResolver.java | 72 +++++++++++++++---- .../kafka/retrytopic/DestinationTopic.java | 46 ++++++++++-- .../retrytopic/DestinationTopicContainer.java | 18 ++++- .../DestinationTopicPropertiesFactory.java | 53 +++++++++++++- .../ExceptionBasedDltDestination.java | 46 ++++++++++++ .../RetryTopicConfigurationBuilder.java | 25 ++++++- ...DefaultDestinationTopicProcessorTests.java | 10 +-- .../DefaultDestinationTopicResolverTests.java | 41 +++++++++-- ...estinationTopicPropertiesFactoryTests.java | 65 ++++++++++++----- .../retrytopic/DestinationTopicTests.java | 26 +++++-- .../RetryTopicConfigurationBuilderTests.java | 22 +++++- ...etryableTopicAnnotationProcessorTests.java | 56 +++++++++++++-- 16 files changed, 489 insertions(+), 66 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc index 963efa4dcc..30a301ebfa 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc @@ -261,3 +261,41 @@ protected Consumer It is recommended that you use the provided resolvers when constructing the custom instance. +[[exc-based-custom-dlt-routing]] +== Routing of messages to custom DLTs based on thrown exceptions + +Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing. +In order to do that, there's a need to specify the routing. +Routing customization consists of the specification of the additional destinations. +Destinations in turn consist of two settings: the `suffix` and `exceptions`. +When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered. +Examples of configuration using either annotations or `RetryTopicConfiguration` beans: + +[source, java] +---- +@RetryableTopic(exceptionBasedDltRouting = { + @ExceptionBasedDltDestination( + suffix = "-deserialization", exceptions = {DeserializationException.class} + )} +) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- + +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class))) + .create(kafkaOperations) + .create(template); +} +---- + +`suffix` takes place before the general `dltTopicSuffix` in the custom DLT name. +Considering presented examples, the message, which caused the `DeserializationException` will be routed to the `my-annotated-topic-deserialization-dlt` instead of the `my-annotated-topic-dlt`. +Custom DLTs will be created following the same rules as stated in the xref:retrytopic/features.adoc#topics-autocreation[Topics AutoCreation]. \ No newline at end of file diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 77198a4bc9..d427e8f42e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -18,4 +18,12 @@ See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionId === Async @KafkaListener Return `@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture`, `Mono` and Kotlin `suspend` functions. -See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. \ No newline at end of file +See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. + +[[x32-customizable-dlt-routing]] +=== Routing of messages to custom DLTs based on thrown exceptions + +It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing. +Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`. +Custom DLTs are created automatically as well as other retry and dead-letter topics. +See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 31ed10f824..be371ece64 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.lang.annotation.Target; import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; @@ -39,6 +40,7 @@ * @author Gary Russell * @author Fabio da Silva Jr. * @author João Lima + * @author Adrian Chlebosz * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -171,6 +173,14 @@ */ String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX; + /** + * The DLT routing allowing to redirect the message to the custom DLT based on the + * exception thrown during the processing. + * @return the exception based DLT routing + * @since 3.2.0 + */ + ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {}; + /** * Whether the retry topics will be suffixed with the delay value for that topic or a * simple index. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 5b716bf719..b19ed2962c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; @@ -34,6 +37,7 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; @@ -58,6 +62,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -148,7 +153,8 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) - .timeoutAfter(timeout); + .timeoutAfter(timeout) + .dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting())); Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); if (attempts != null) { @@ -207,6 +213,11 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @N return policy; } + private Map>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) { + return Arrays.stream(routingRules) + .collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); + } + private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) { Class declaringClass = listenerMethod.getDeclaringClass(); return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass)) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 522256b436..bd137714be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Yvette Quinby + * @author Adrian Chlebosz * @since 2.7 * */ @@ -97,8 +98,8 @@ public DestinationTopic resolveDestinationTopic(String mainListenerId, String to : destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e)) && isNotFatalException(e) && !isPastTimout(originalTimestamp, destinationTopicHolder) - ? resolveRetryDestination(destinationTopicHolder) - : getDltOrNoOpsDestination(mainListenerId, topic); + ? resolveRetryDestination(mainListenerId, destinationTopicHolder, e) + : getDltOrNoOpsDestination(mainListenerId, topic, e); } private Boolean isNotFatalException(Exception e) { @@ -128,10 +129,20 @@ && isNotFatalException(e) } @SuppressWarnings("deprecation") - private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) - ? destinationTopicHolder.getSourceDestination() - : destinationTopicHolder.getNextDestination(); + private DestinationTopic resolveRetryDestination(String mainListenerId, DestinationTopicHolder destinationTopicHolder, Exception e) { + if (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) { + return destinationTopicHolder.getSourceDestination(); + } + + if (isAlreadyDltDestination(destinationTopicHolder)) { + return getDltOrNoOpsDestination(mainListenerId, destinationTopicHolder.getSourceDestination().getDestinationName(), e); + } + + return destinationTopicHolder.getNextDestination(); + } + + private static boolean isAlreadyDltDestination(DestinationTopicHolder destinationTopicHolder) { + return destinationTopicHolder.getNextDestination().isDltTopic(); } @Override @@ -142,20 +153,55 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String () -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination(); } - @Nullable @Override public DestinationTopic getDltFor(String mainListenerId, String topicName) { - DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName); + return getDltFor(mainListenerId, topicName, null); + } + + @Nullable + @Override + public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) { + DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e); return destination.isNoOpsTopic() ? null : destination; } - private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic) { + private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) { DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic); - return destination.isDltTopic() || destination.isNoOpsTopic() - ? destination - : getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName()); + return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ? + destination : + getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e); + } + + private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) { + if (!destination.isDltTopic()) { + return false; + } + + boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream() + .anyMatch(excType -> isDirectExcOrCause(e, excType)); + boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty(); + return isDltIntendedForCurrentExc || isGenericPurposeDlt; + } + + private static boolean isDirectExcOrCause(Exception e, Class excType) { + if (e == null) { + return false; + } + + Throwable toMatch = e; + + boolean isMatched = excType.isInstance(toMatch); + while (!isMatched) { + toMatch = toMatch.getCause(); + if (toMatch == null) { + return false; + } + isMatched = excType.isInstance(toMatch); + } + + return isMatched; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index c6895be55a..936a1c4dbf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ package org.springframework.kafka.retrytopic; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.kafka.core.KafkaOperations; @@ -28,6 +30,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -88,6 +91,10 @@ public boolean shouldRetryOn(Integer attempt, Throwable e) { return this.properties.shouldRetryOn.test(attempt, e); } + public Set> usedForExceptions() { + return Collections.unmodifiableSet(this.properties.usedForExceptions); + } + @Override public String toString() { return "DestinationTopic{" + @@ -137,9 +144,10 @@ public static class Properties { private final long timeout; + private final Set> usedForExceptions; + @Nullable private final Boolean autoStartDltHandler; - /** * Create an instance with the provided properties with the DLT container starting * automatically (if the container factory is so configured). @@ -160,7 +168,7 @@ public Properties(long delayMs, String suffix, Type type, BiPredicate shouldRetryOn, long timeout) { this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, - timeout, null); + timeout, null, Collections.emptySet()); } /** @@ -173,7 +181,7 @@ public Properties(long delayMs, String suffix, Type type, public Properties(Properties sourceProperties, String suffix, Type type) { this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions, sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn, - sourceProperties.timeout, null); + sourceProperties.timeout, null, Collections.emptySet()); } /** @@ -195,6 +203,31 @@ public Properties(long delayMs, String suffix, Type type, DltStrategy dltStrategy, KafkaOperations kafkaOperations, BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, + timeout, autoStartDltHandler, Collections.emptySet()); + } + + /** + * Create an instance with the provided properties. + * @param delayMs the delay in ms. + * @param suffix the suffix. + * @param type the type. + * @param maxAttempts the max attempts. + * @param numPartitions the number of partitions. + * @param dltStrategy the DLT strategy. + * @param kafkaOperations the {@link KafkaOperations}. + * @param shouldRetryOn the exception classifications. + * @param timeout the timeout. + * @param autoStartDltHandler whether or not to start the DLT handler. + * @param usedForExceptions the exceptions which destination is intended for + * @since 3.2 + */ + public Properties(long delayMs, String suffix, Type type, + int maxAttempts, int numPartitions, + DltStrategy dltStrategy, + KafkaOperations kafkaOperations, + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, + Set> usedForExceptions) { this.delayMs = delayMs; this.suffix = suffix; @@ -206,6 +239,7 @@ public Properties(long delayMs, String suffix, Type type, this.shouldRetryOn = shouldRetryOn; this.timeout = timeout; this.autoStartDltHandler = autoStartDltHandler; + this.usedForExceptions = usedForExceptions; } public boolean isDltTopic() { @@ -239,6 +273,10 @@ public Boolean autoStartDltHandler() { return this.autoStartDltHandler; } + public Set> usedForExceptions() { + return this.usedForExceptions; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java index 25dc22705b..45c9e566f7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ public interface DestinationTopicContainer { @@ -66,8 +67,23 @@ public interface DestinationTopicContainer { * @param mainListenerId the listener id. * @param topicName the topic name for which to look the DLT for * @return The {@link DestinationTopic} instance corresponding to the DLT. + * @deprecated Replaced by {@link #getDltFor(String, String, Exception)} */ @Nullable + @Deprecated(since = "3.2", forRemoval = true) DestinationTopic getDltFor(String mainListenerId, String topicName); + /** + * Returns the {@link DestinationTopic} instance registered as + * DLT for the given topic taking into consideration the exception + * thrown, or null if none is found. + * @param mainListenerId the listener id. + * @param topicName the topic name for which to look the DLT for + * @param exc the exception which is being handled + * @return The {@link DestinationTopic} instance corresponding to the DLT. + */ + @Nullable + default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) { + return null; + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 1119bf39e9..d33ea3fa66 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -18,7 +18,11 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.classify.BinaryExceptionClassifier; @@ -36,6 +40,7 @@ * @author Gary Russell * @author João Lima * @author Wang Zhiyang + * @author Adrian Chlebosz * @since 2.7 * */ @@ -65,6 +70,8 @@ public class DestinationTopicPropertiesFactory { private final long timeout; + private final Map>> dltRoutingRules; + @Nullable private Boolean autoStartDltHandler; @@ -90,11 +97,40 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, long timeout) { + this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations, + dltStrategy, topicSuffixingStrategy, sameIntervalTopicReuseStrategy, timeout, Collections.emptyMap()); + } + + /** + * Construct an instance with the provided properties. + * @param retryTopicSuffix the suffix. + * @param dltSuffix the dlt suffix. + * @param backOffValues the back off values. + * @param exceptionClassifier the exception classifier. + * @param numPartitions the number of partitions. + * @param kafkaOperations the operations. + * @param dltStrategy the dlt strategy. + * @param topicSuffixingStrategy the topic suffixing strategy. + * @param sameIntervalTopicReuseStrategy the same interval reuse strategy. + * @param timeout the timeout. + * @param dltRoutingRules the specification of which DLT should be used for the particular exception type + * @since 3.2.0 + */ + public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, + BinaryExceptionClassifier exceptionClassifier, + int numPartitions, KafkaOperations kafkaOperations, + DltStrategy dltStrategy, + TopicSuffixingStrategy topicSuffixingStrategy, + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, + long timeout, + Map>> dltRoutingRules) { + this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; this.numPartitions = numPartitions; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); + this.dltRoutingRules = copyDltRoutingRules(dltRoutingRules); this.backOffValues = backOffValues; int backOffValuesSize = this.backOffValues.size(); this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); @@ -107,6 +143,12 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts(); } + private Map>> copyDltRoutingRules(Map>> dltRoutingRules) { + Map>> copyOfDltRoutingRules = new HashMap<>(); + dltRoutingRules.forEach((topicSuffix, exceptions) -> copyOfDltRoutingRules.put(topicSuffix, new HashSet<>(exceptions))); + return copyOfDltRoutingRules; + } + /** * Set to false to not start the DLT handler. * @param autoStart false to not start. @@ -125,6 +167,7 @@ public List createProperties() { list.add(createRetryProperties(backOffIndex)); } if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) { + list.addAll(createCustomDltProperties()); list.add(createDltProperties()); } return Collections.unmodifiableList(list); @@ -138,7 +181,15 @@ private DestinationTopic.Properties createMainTopicProperties() { private DestinationTopic.Properties createDltProperties() { return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, Collections.emptySet()); + } + + private List createCustomDltProperties() { + return this.dltRoutingRules.entrySet().stream() + .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + this.destinationTopicSuffixes.getDltSuffix(), + DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue())) + .toList(); } private DestinationTopic.Properties createRetryProperties(int backOffIndex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java new file mode 100644 index 0000000000..06443a93f5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +/** + * Annotation allowing to specify additional DLT which will be chosen when message + * processing caused the configured exception to be thrown. + * + * @author Adrian Chlebosz + * + * @since 3.2.0 + */ +public @interface ExceptionBasedDltDestination { + + /** + * Suffix extension used when constructing the name for the new DLT. It is placed + * before the main suffix configured through the + * ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the + * final name is the product of these two. + * @return the configured suffix extension + */ + String suffix(); + + /** + * When message processing throws one of the exceptions configured here, then + * it should be eventually redirected to the DLT with name containing the extension + * configured through {@link #suffix()}. The causes of the thrown exception will be + * traversed to match with any of configured ones. + * @return configured exceptions + */ + Class[] exceptions(); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 4e24b1deb7..c178dfb155 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,10 @@ package org.springframework.kafka.retrytopic; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; @@ -41,6 +44,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 * */ @@ -66,6 +70,7 @@ public class RetryTopicConfigurationBuilder { private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(); + private ConcurrentKafkaListenerContainerFactory listenerContainerFactory; @Nullable @@ -74,6 +79,8 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; + private final Map>> dltRoutingRules = new HashMap<>(); + private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; private long timeout = RetryTopicConstants.NOT_SET; @@ -522,6 +529,20 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { return this.classifierBuilder; } + /** + * Configure to set DLT routing rules causing the message to be redirected to the custom + * DLT when the configured exception has been thrown during message processing. + * The cause of the originally thrown exception will be traversed in order to find the + * match with the configured exceptions. + * @param dltRoutingRules specification of custom DLT name extensions and exceptions which should be matched for them + * @return the builder + * @since 3.2.0 + */ + public RetryTopicConfigurationBuilder dltRoutingRules(Map>> dltRoutingRules) { + this.dltRoutingRules.putAll(dltRoutingRules); + return this; + } + /* ---------------- Configure KafkaListenerContainerFactory -------------- */ /** * Configure the container factory to use. @@ -567,7 +588,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.dltStrategy, - this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.dltRoutingRules) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java index 8f2943d501..8efea11c1a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -75,11 +76,12 @@ void shouldRegisterTopicDestinations() { // then assertThat(context.destinationsByTopicMap.containsKey(FIRST_TOPIC)).isTrue(); List destinationTopicsForFirstTopic = context.destinationsByTopicMap.get(FIRST_TOPIC); - assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(4); + assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(5); assertThat(destinationTopicsForFirstTopic.get(0)).isEqualTo(mainDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(1)).isEqualTo(firstRetryDestinationTopic); assertThat(destinationTopicsForFirstTopic.get(2)).isEqualTo(secondRetryDestinationTopic); - assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(dltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(destinationTopicsForFirstTopic.get(4)).isEqualTo(dltDestinationTopic); assertThat(context.destinationsByTopicMap.containsKey(SECOND_TOPIC)).isTrue(); List destinationTopicsForSecondTopic = context.destinationsByTopicMap.get(SECOND_TOPIC); @@ -143,7 +145,7 @@ void shouldCreateDestinationMapWhenProcessDestinations() { .flatMap(list -> list.stream()) .collect(Collectors.toList()); - assertThat(destinationList.size()).isEqualTo(11); + assertThat(destinationList.size()).isEqualTo(12); assertThat(destinationList.contains(mainDestinationTopic)).isTrue(); assertThat(destinationList.contains(firstRetryDestinationTopic)).isTrue(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index ea1fa3334b..67ebc3dada 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,11 +39,13 @@ import org.springframework.kafka.listener.TimestampedException; import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes * @author Yvette Quinby * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -78,16 +80,16 @@ public void setup() { void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", firstRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", dltDestinationTopic.getDestinationName(), 1, - new RuntimeException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); + new IllegalStateException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic); assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", mainDestinationTopic2.getDestinationName(), 1, @@ -142,6 +144,19 @@ void shouldResolveDltDestinationForFatalDefaultException() { .isEqualTo(dltDestinationTopic); } + @Test + void shouldResolveDeserializationDltDestinationForDeserializationException() { + DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException()); + TimestampedException timestampedExc = new TimestampedException(exc); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), + 1, timestampedExc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic); + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(), + 1, timestampedExc, originalTimestamp)).isEqualTo(dltDestinationTopic); + } + @Test void shouldResolveNoOpsForFatalDefaultExceptionInDlt() { assertThat(defaultDestinationTopicContainer @@ -205,12 +220,26 @@ void shouldGetNextDestinationTopic() { } @Test - void shouldGetDlt() { + void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() { assertThat(defaultDestinationTopicContainer .getDltFor("id", mainDestinationTopic.getDestinationName())) .isEqualTo(dltDestinationTopic); } + @Test + void shouldGetGeneralPurposeDltWhenThereIsNoCustomDltRegisteredForExceptionType() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException())) + .isEqualTo(dltDestinationTopic); + } + + @Test + void shouldGetCustomDltWhenThereIsCustomDltRegisteredForExceptionType() { + assertThat(defaultDestinationTopicContainer + .getDltFor("id", mainDestinationTopic.getDestinationName(), new DeserializationException(null, null, false, null))) + .isEqualTo(deserializationExcDltDestinationTopic); + } + @Test void shouldThrowIfNoDestinationFound() { assertThatNullPointerException().isThrownBy( diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 82ea3fc5f6..a93322460c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -18,7 +18,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; @@ -30,14 +33,15 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; - /** * @author Tomaz Fernandes * @author Wang Zhiyang + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -89,10 +93,10 @@ void shouldCreateMainAndDltProperties() { List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 2).isTrue(); @@ -112,6 +116,10 @@ void shouldCreateMainAndDltProperties() { } private void assertDltTopic(DestinationTopic.Properties dltProperties) { + assertDltTopic(dltProperties, this.dltSuffix); + } + + private void assertDltTopic(DestinationTopic.Properties dltProperties, String dltSuffix) { assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); assertThat(dltProperties.isDltTopic()).isTrue(); assertThat(dltProperties.isRetryTopic()).isFalse(); @@ -134,10 +142,10 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, - multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET).createProperties(); + multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -185,16 +193,35 @@ void shouldNotCreateDltProperties() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 3).isTrue(); assertThat(propertiesList.get(2).isDltTopic()).isFalse(); } + @Test + void shouldCreateDltPropertiesForCustomExceptionBasedRouting() { + // when + List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); + + String desExcDltSuffix = "deserialization"; + List propertiesList = + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET, Map.of(desExcDltSuffix, Set.of(DeserializationException.class))).createProperties(); + + // then + assertThat(propertiesList.size()).isSameAs(3); + + assertDltTopic(propertiesList.get(1), desExcDltSuffix + this.dltSuffix); + assertDltTopic(propertiesList.get(2)); + } + @Test void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { @@ -206,10 +233,10 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -247,10 +274,10 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -294,10 +321,10 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, - multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) @@ -315,10 +342,10 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts) @@ -340,7 +367,7 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -368,7 +395,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -395,7 +422,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -418,7 +445,7 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 1b381df46f..d90786900b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.classify.BinaryExceptionClassifier; @@ -26,9 +27,11 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.DeserializationException; /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ public class DestinationTopicTests { @@ -72,12 +75,16 @@ public class DestinationTopicTests { new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + protected DestinationTopic.Properties deserializationDltTopicProps = + new DestinationTopic.Properties(0, "-deserialization" + dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Set.of(DeserializationException.class)); + protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet()); protected List allProps = Arrays - .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); + .asList(mainTopicProps, firstRetryProps, secondRetryProps, deserializationDltTopicProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -93,7 +100,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected List allProps2 = Arrays .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); @@ -124,7 +131,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps4 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected DestinationTopic.Properties mainTopicProps5 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -136,7 +143,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps5 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); // Holders @@ -148,10 +155,12 @@ public class DestinationTopicTests { protected PropsHolder secondRetryDestinationHolder = new PropsHolder(FIRST_TOPIC, secondRetryProps); + protected PropsHolder deserializationDltDestinationHolder = new PropsHolder(FIRST_TOPIC, deserializationDltTopicProps); + protected PropsHolder dltDestinationHolder = new PropsHolder(FIRST_TOPIC, dltTopicProps); protected List allFirstDestinationsHolders = Arrays - .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, dltDestinationHolder); + .asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, deserializationDltDestinationHolder, dltDestinationHolder); protected final static String SECOND_TOPIC = "secondTopic"; @@ -206,12 +215,15 @@ public class DestinationTopicTests { protected DestinationTopic dltDestinationTopic = new DestinationTopic(FIRST_TOPIC + dltTopicProps.suffix(), dltTopicProps); + protected DestinationTopic deserializationExcDltDestinationTopic = + new DestinationTopic(FIRST_TOPIC + "-deserialization" + dltTopicProps.suffix(), deserializationDltTopicProps); + protected DestinationTopic noOpsDestinationTopic = new DestinationTopic(dltDestinationTopic.getDestinationName() + "-noOps", new DestinationTopic.Properties(dltTopicProps, "-noOps", DestinationTopic.Type.NO_OPS)); protected List allFirstDestinationsTopics = Arrays - .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, dltDestinationTopic); + .asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, deserializationExcDltDestinationTopic, dltDestinationTopic); protected DestinationTopic mainDestinationTopic2 = new DestinationTopic(SECOND_TOPIC + mainTopicProps2.suffix(), mainTopicProps2); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java index 5da6ba70f6..990fe99e34 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,11 +30,13 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.util.ReflectionTestUtils; /** * @author Tomaz Fernandes + * @author Adrian Chlebosz * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -187,4 +191,20 @@ void shouldSetNotAutoCreateTopics() { RetryTopicConfiguration.TopicCreation config = configuration.forKafkaTopicAutoCreation(); assertThat(config.shouldCreateTopics()).isFalse(); } + + @Test + void shouldSetDltRoutingRules() { + // setup + RetryTopicConfigurationBuilder builder = new RetryTopicConfigurationBuilder(); + + //when + RetryTopicConfiguration configuration = builder + .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class))) + .create(kafkaOperations); + + // then + DestinationTopic.Properties desExcDltProps = configuration.getDestinationTopicProperties().get(3); + assertThat(desExcDltProps.suffix()).isEqualTo("-deserialization-dlt"); + assertThat(desExcDltProps.usedForExceptions()).containsExactly(DeserializationException.class); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 5daa04fcd2..2c887cf8d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.annotation.Backoff; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ReflectionUtils; @@ -50,6 +51,7 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Adrian Chlebosz * @since 2.7 */ @SuppressWarnings("deprecation") @@ -98,6 +100,15 @@ class RetryableTopicAnnotationProcessorTests { private final Object bean = createBean(); + // Retry with custom DLT routing + private final Method listenWithCustomDltRouting = ReflectionUtils + .findMethod(RetryableTopicAnnotationFactoryWithCustomDltRouting.class, listenerMethodName); + + private final RetryableTopic annotationWithCustomDltRouting = AnnotationUtils.findAnnotation( + listenWithCustomDltRouting, RetryableTopic.class); + + private final Object beanWithCustomDltRouting = createBean(); + private Object createBean() { try { return RetryableTopicAnnotationFactory.class.getDeclaredConstructor().newInstance(); @@ -107,17 +118,18 @@ private Object createBean() { } } + @Test void shouldGetDltHandlerMethod() { // setup given(beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) - .willReturn(kafkaOperationsFromDefaultName); + .willReturn(kafkaOperationsFromDefaultName); RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); // then EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod(); @@ -125,7 +137,7 @@ void shouldGetDltHandlerMethod() { assertThat(method.getName()).isEqualTo("handleDlt"); assertThat(new DestinationTopic("", - configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); + configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); } @Test @@ -313,6 +325,27 @@ void shouldCreateFixedBackoff() { } + @Test + void shouldCreateExceptionBasedDltRoutingSpec() { + // setup + given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) + .willReturn(kafkaOperationsFromDefaultName); + RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); + + // given + RetryTopicConfiguration configuration = processor + .processAnnotation( + topics, listenWithCustomDltRouting, annotationWithCustomDltRouting, beanWithCustomDltRouting); + + // then + List destinationTopicProperties = configuration.getDestinationTopicProperties(); + + assertThat(destinationTopicProperties).hasSize(3); + assertThat(destinationTopicProperties.get(0).suffix()).isEmpty(); + assertThat(destinationTopicProperties.get(1).suffix()).isEqualTo("-deserialization-dlt"); + assertThat(destinationTopicProperties.get(2).suffix()).isEqualTo("-dlt"); + } + static class RetryableTopicAnnotationFactory { @KafkaListener @@ -336,4 +369,19 @@ void handleDlt() { // NoOps } } + + static class RetryableTopicAnnotationFactoryWithCustomDltRouting { + @KafkaListener + @RetryableTopic( + attempts = "1", + exceptionBasedDltRouting = { + @ExceptionBasedDltDestination( + suffix = "-deserialization", exceptions = {DeserializationException.class} + ) + } + ) + void listenWithRetry() { + // NoOps + } + } }