Skip to content

Commit

Permalink
GH-2800: code review amendments
Browse files Browse the repository at this point in the history
  • Loading branch information
breader124 committed Jan 26, 2024
1 parent c5dd055 commit 474229f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ It is recommended that you use the provided resolvers when constructing the cust
== Routing of messages to custom DLTs based on thrown exceptions

Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
In order to do that, you need to specify the routing.
In order to do that, there's a need to specify the routing.
Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
Take a look at the examples:
Examples of configuration using either annotations or `RetryTopicConfiguration` beans:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His

A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.

[[x32-customizable-dlt-routing]]
=== Routing of messages to custom DLTs based on thrown exceptions

It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`.
Custom DLTs are created automatically as well as other retry and dead-letter topics.
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
* The DLT routing allowing to redirect the message to the custom DLT based on the
* exception thrown during the processing.
* @return the exception based DLT routing
* @since 3.2.0
*/
ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()))
.timeoutAfter(timeout);
.dltRoutingRules(createDltRoutingSpecFromAnnotation(annotation.exceptionBasedDltRouting()));

Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true);
if (attempts != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -131,12 +133,18 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.dltRoutingRules = dltRoutingRules;
this.dltRoutingRules = copyDltRoutingRules(dltRoutingRules);
this.backOffValues = backOffValues;
// Max Attempts include the initial try.
this.maxAttempts = this.backOffValues.size() + 1;
}

private Map<String, Set<Class<? extends Throwable>>> copyDltRoutingRules(Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {
Map<String, Set<Class<? extends Throwable>>> copyOfDltRoutingRules = new HashMap<>();
dltRoutingRules.forEach((topicSuffix, exceptions) -> copyOfDltRoutingRules.put(topicSuffix, new HashSet<>(exceptions)));
return copyOfDltRoutingRules;
}

/**
* Set to false to not start the DLT handler.
* @param autoStart false to not start.
Expand Down

0 comments on commit 474229f

Please sign in to comment.