Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2800: Make it possible to route messages to custom DLT based on exceptions thrown #2929

Merged
merged 9 commits into from
Feb 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,41 @@ protected Consumer<DeadLetterPublishingRecovererFactory>

It is recommended that you use the provided resolvers when constructing the custom instance.

[[exc-based-custom-dlt-routing]]
== Routing of messages to custom DLTs based on thrown exceptions

Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
In order to do that, there's a need to specify the routing.
Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
Examples of configuration using either annotations or `RetryTopicConfiguration` beans:

[source, java]
----
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations)
.create(template);
}
----

`suffix` takes place before the general `dltTopicSuffix` in the custom DLT name.
Considering presented examples, the message, which caused the `DeserializationException` will be routed to the `my-annotated-topic-deserialization-dlt` instead of the `my-annotated-topic-dlt`.
Custom DLTs will be created following the same rules as stated in the xref:retrytopic/features.adoc#topics-autocreation[Topics AutoCreation].
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionId
=== Async @KafkaListener Return

`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.

[[x32-customizable-dlt-routing]]
=== Routing of messages to custom DLTs based on thrown exceptions

It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`.
Custom DLTs are created automatically as well as other retry and dead-letter topics.
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.lang.annotation.Target;

import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
Expand All @@ -39,6 +40,7 @@
* @author Gary Russell
* @author Fabio da Silva Jr.
* @author João Lima
* @author Adrian Chlebosz
* @since 2.7
*
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
Expand Down Expand Up @@ -171,6 +173,14 @@
*/
String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX;

/**
* The DLT routing allowing to redirect the message to the custom DLT based on the
* exception thrown during the processing.
* @return the exception based DLT routing
sobychacko marked this conversation as resolved.
Show resolved Hide resolved
* @since 3.2.0
*/
ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {};

/**
* Whether the retry topics will be suffixed with the delay value for that topic or a
* simple index.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
Expand All @@ -34,6 +37,7 @@
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
Expand All @@ -58,6 +62,7 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -148,7 +153,8 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.autoStartDltHandler(autoStartDlt)
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout);
.timeoutAfter(timeout)
artembilan marked this conversation as resolved.
Show resolved Hide resolved
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()));

Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true);
if (attempts != null) {
Expand Down Expand Up @@ -207,6 +213,11 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, @N
return policy;
}

private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) {
return Arrays.stream(routingRules)
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
}

private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
Class<?> declaringClass = listenerMethod.getDeclaringClass();
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Yvette Quinby
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -97,8 +98,8 @@ public DestinationTopic resolveDestinationTopic(String mainListenerId, String to
: destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e))
&& isNotFatalException(e)
&& !isPastTimout(originalTimestamp, destinationTopicHolder)
? resolveRetryDestination(destinationTopicHolder)
: getDltOrNoOpsDestination(mainListenerId, topic);
? resolveRetryDestination(mainListenerId, destinationTopicHolder, e)
: getDltOrNoOpsDestination(mainListenerId, topic, e);
}

private Boolean isNotFatalException(Exception e) {
Expand Down Expand Up @@ -128,10 +129,20 @@ && isNotFatalException(e)
}

@SuppressWarnings("deprecation")
private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) {
return (destinationTopicHolder.getSourceDestination().isReusableRetryTopic())
? destinationTopicHolder.getSourceDestination()
: destinationTopicHolder.getNextDestination();
private DestinationTopic resolveRetryDestination(String mainListenerId, DestinationTopicHolder destinationTopicHolder, Exception e) {
if (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) {
return destinationTopicHolder.getSourceDestination();
}

if (isAlreadyDltDestination(destinationTopicHolder)) {
return getDltOrNoOpsDestination(mainListenerId, destinationTopicHolder.getSourceDestination().getDestinationName(), e);
}

return destinationTopicHolder.getNextDestination();
}

private static boolean isAlreadyDltDestination(DestinationTopicHolder destinationTopicHolder) {
return destinationTopicHolder.getNextDestination().isDltTopic();
}

@Override
Expand All @@ -142,20 +153,55 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String
() -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination();
}

@Nullable
@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName) {
DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName);
return getDltFor(mainListenerId, topicName, null);
}

@Nullable
@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e);
return destination.isNoOpsTopic()
? null
: destination;
}

private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic) {
private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) {
DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic);
return destination.isDltTopic() || destination.isNoOpsTopic()
? destination
: getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName());
return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ?
destination :
getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e);
}

private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) {
if (!destination.isDltTopic()) {
return false;
}

boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream()
.anyMatch(excType -> isDirectExcOrCause(e, excType));
boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty();
return isDltIntendedForCurrentExc || isGenericPurposeDlt;
}

private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable> excType) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
if (e == null) {
return false;
}

Throwable toMatch = e;

boolean isMatched = excType.isInstance(toMatch);
while (!isMatched) {
toMatch = toMatch.getCause();
if (toMatch == null) {
return false;
}
isMatched = excType.isInstance(toMatch);
}

return isMatched;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,9 @@

package org.springframework.kafka.retrytopic;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiPredicate;

import org.springframework.kafka.core.KafkaOperations;
Expand All @@ -28,6 +30,7 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.7
*
*/
Expand Down Expand Up @@ -88,6 +91,10 @@ public boolean shouldRetryOn(Integer attempt, Throwable e) {
return this.properties.shouldRetryOn.test(attempt, e);
}

public Set<Class<? extends Throwable>> usedForExceptions() {
return Collections.unmodifiableSet(this.properties.usedForExceptions);
}

@Override
public String toString() {
return "DestinationTopic{" +
Expand Down Expand Up @@ -137,9 +144,10 @@ public static class Properties {

private final long timeout;

private final Set<Class<? extends Throwable>> usedForExceptions;

@Nullable
private final Boolean autoStartDltHandler;

/**
* Create an instance with the provided properties with the DLT container starting
* automatically (if the container factory is so configured).
Expand All @@ -160,7 +168,7 @@ public Properties(long delayMs, String suffix, Type type,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout) {

this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn,
timeout, null);
timeout, null, Collections.emptySet());
}

/**
Expand All @@ -173,7 +181,7 @@ public Properties(long delayMs, String suffix, Type type,
public Properties(Properties sourceProperties, String suffix, Type type) {
this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions,
sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn,
sourceProperties.timeout, null);
sourceProperties.timeout, null, Collections.emptySet());
}

/**
Expand All @@ -195,6 +203,31 @@ public Properties(long delayMs, String suffix, Type type,
DltStrategy dltStrategy,
KafkaOperations<?, ?> kafkaOperations,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) {
this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn,
timeout, autoStartDltHandler, Collections.emptySet());
}

/**
* Create an instance with the provided properties.
* @param delayMs the delay in ms.
* @param suffix the suffix.
* @param type the type.
* @param maxAttempts the max attempts.
* @param numPartitions the number of partitions.
* @param dltStrategy the DLT strategy.
* @param kafkaOperations the {@link KafkaOperations}.
* @param shouldRetryOn the exception classifications.
* @param timeout the timeout.
* @param autoStartDltHandler whether or not to start the DLT handler.
* @param usedForExceptions the exceptions which destination is intended for
* @since 3.2
*/
public Properties(long delayMs, String suffix, Type type,
int maxAttempts, int numPartitions,
DltStrategy dltStrategy,
KafkaOperations<?, ?> kafkaOperations,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler,
Set<Class<? extends Throwable>> usedForExceptions) {

this.delayMs = delayMs;
this.suffix = suffix;
Expand All @@ -206,6 +239,7 @@ public Properties(long delayMs, String suffix, Type type,
this.shouldRetryOn = shouldRetryOn;
this.timeout = timeout;
this.autoStartDltHandler = autoStartDltHandler;
this.usedForExceptions = usedForExceptions;
}

public boolean isDltTopic() {
Expand Down Expand Up @@ -239,6 +273,10 @@ public Boolean autoStartDltHandler() {
return this.autoStartDltHandler;
}

public Set<Class<? extends Throwable>> usedForExceptions() {
return this.usedForExceptions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading
Loading