Skip to content

Commit

Permalink
GH-2800: code review amendments
Browse files Browse the repository at this point in the history
  • Loading branch information
breader124 committed Feb 1, 2024
1 parent e46d481 commit 18220c5
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ It is recommended that you use the provided resolvers when constructing the cust
== Routing of messages to custom DLTs based on thrown exceptions

Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
In order to do that, you need to specify the routing.
In order to do that, there's a need to specify the routing.
Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
Take a look at the examples:
Examples of configuration using either annotations or `RetryTopicConfiguration` beans:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionId
=== Async @KafkaListener Return

`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.

[[x32-customizable-dlt-routing]]
=== Routing of messages to custom DLTs based on thrown exceptions

It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`.
Custom DLTs are created automatically as well as other retry and dead-letter topics.
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
* The DLT routing allowing to redirect the message to the custom DLT based on the
* exception thrown during the processing.
* @return the exception based DLT routing
* @since 3.2.0
*/
ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()))
.timeoutAfter(timeout);
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()));

Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true);
if (attempts != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -128,7 +130,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.numPartitions = numPartitions;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.dltRoutingRules = dltRoutingRules;
this.dltRoutingRules = copyDltRoutingRules(dltRoutingRules);
this.backOffValues = backOffValues;
int backOffValuesSize = this.backOffValues.size();
this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy);
Expand All @@ -141,6 +143,12 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts();
}

private Map<String, Set<Class<? extends Throwable>>> copyDltRoutingRules(Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {
Map<String, Set<Class<? extends Throwable>>> copyOfDltRoutingRules = new HashMap<>();
dltRoutingRules.forEach((topicSuffix, exceptions) -> copyOfDltRoutingRules.put(topicSuffix, new HashSet<>(exceptions)));
return copyOfDltRoutingRules;
}

/**
* Set to false to not start the DLT handler.
* @param autoStart false to not start.
Expand All @@ -159,8 +167,8 @@ public List<DestinationTopic.Properties> createProperties() {
list.add(createRetryProperties(backOffIndex));
}
if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) {
list.add(createDltProperties());
list.addAll(createCustomDltProperties());
list.add(createDltProperties());
}
return Collections.unmodifiableList(list);
}
Expand Down

0 comments on commit 18220c5

Please sign in to comment.