Skip to content

Commit

Permalink
GH-2800: amendments after the code review
Browse files Browse the repository at this point in the history
  • Loading branch information
breader124 authored and Adrian Chlebosz committed Jan 25, 2024
1 parent f0d76f6 commit 797c010
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@
import java.lang.annotation.Target;

import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
Expand Down Expand Up @@ -178,7 +178,7 @@
* exception thrown during the processing.
* @return the exception based DLT routing
*/
ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting;
ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {};

/**
* Whether the retry topics will be suffixed with the delay value for that topic or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
Expand Down Expand Up @@ -215,8 +214,8 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, @N
return policy;
}

private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) {
return Arrays.stream(routingSpec.routingRules())
private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) {
return Arrays.stream(routingRules)
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,6 +153,11 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String
() -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination();
}

@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName) {
return getDltFor(mainListenerId, topicName, null);
}

@Nullable
@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
Expand Down Expand Up @@ -181,6 +186,10 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio
}

private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable> excType) {
if (e == null) {
return false;
}

Throwable toMatch = e;

boolean isMatched = excType.isInstance(toMatch);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -196,9 +196,32 @@ public Properties(Properties sourceProperties, String suffix, Type type) {
* @param shouldRetryOn the exception classifications.
* @param timeout the timeout.
* @param autoStartDltHandler whether or not to start the DLT handler.
* @param usedForExceptions the exceptions which destination is intended for
* @since 2.8
*/
public Properties(long delayMs, String suffix, Type type,
int maxAttempts, int numPartitions,
DltStrategy dltStrategy,
KafkaOperations<?, ?> kafkaOperations,
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) {
this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn,
timeout, autoStartDltHandler, Collections.emptySet());
}

/**
* Create an instance with the provided properties.
* @param delayMs the delay in ms.
* @param suffix the suffix.
* @param type the type.
* @param maxAttempts the max attempts.
* @param numPartitions the number of partitions.
* @param dltStrategy the DLT strategy.
* @param kafkaOperations the {@link KafkaOperations}.
* @param shouldRetryOn the exception classifications.
* @param timeout the timeout.
* @param autoStartDltHandler whether or not to start the DLT handler.
* @param usedForExceptions the exceptions which destination is intended for
* @since 3.2
*/
public Properties(long delayMs, String suffix, Type type,
int maxAttempts, int numPartitions,
DltStrategy dltStrategy,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,10 +66,24 @@ public interface DestinationTopicContainer {
* DLT for the given topic, or null if none is found.
* @param mainListenerId the listener id.
* @param topicName the topic name for which to look the DLT for
* @param exc the exception which is being handled
* @return The {@link DestinationTopic} instance corresponding to the DLT.
* @deprecated Replaced by {@link #getDltFor(String, String, Exception)}
*/
@Nullable
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc);
@Deprecated(since = "3.2", forRemoval = true)
DestinationTopic getDltFor(String mainListenerId, String topicName);

/**
* Returns the {@link DestinationTopic} instance registered as
* DLT for the given topic taking into consideration the exception
* thrown, or null if none is found.
* @param mainListenerId the listener id.
* @param topicName the topic name for which to look the DLT for
* @param exc the exception which is being handled
* @return The {@link DestinationTopic} instance corresponding to the DLT.
*/
@Nullable
default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,9 +85,35 @@ public class DestinationTopicPropertiesFactory {
* @param topicSuffixingStrategy the topic suffixing strategy.
* @param sameIntervalTopicReuseStrategy the same interval reuse strategy.
* @param timeout the timeout.
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
* @since 3.0.12
*/
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
BinaryExceptionClassifier exceptionClassifier,
int numPartitions, KafkaOperations<?, ?> kafkaOperations,
DltStrategy dltStrategy,
TopicSuffixingStrategy topicSuffixingStrategy,
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy,
long timeout) {

this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations,
dltStrategy, topicSuffixingStrategy, sameIntervalTopicReuseStrategy, timeout, Collections.emptyMap());
}

/**
* Construct an instance with the provided properties.
* @param retryTopicSuffix the suffix.
* @param dltSuffix the dlt suffix.
* @param backOffValues the back off values.
* @param exceptionClassifier the exception classifier.
* @param numPartitions the number of partitions.
* @param kafkaOperations the operations.
* @param dltStrategy the dlt strategy.
* @param topicSuffixingStrategy the topic suffixing strategy.
* @param sameIntervalTopicReuseStrategy the same interval reuse strategy.
* @param timeout the timeout.
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
* @since 3.2.0
*/
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
BinaryExceptionClassifier exceptionClassifier,
int numPartitions, KafkaOperations<?, ?> kafkaOperations,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
* processing caused the configured exception to be thrown.
*
* @author Adrian Chlebosz
* @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting
*
* @since 3.2.0
*/
public @interface ExceptionBasedDltDestination {
Expand All @@ -31,7 +31,6 @@
* before the main suffix configured through the
* ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the
* final name is the product of these two.
*
* @return the configured suffix extension
*/
String suffix();
Expand All @@ -41,7 +40,6 @@
* it should be eventually redirected to the DLT with name containing the extension
* configured through {@link #suffix()}. The causes of the thrown exception will be
* traversed to match with any of configured ones.
*
* @return configured exceptions
*/
Class<? extends Throwable>[] exceptions();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,7 @@ public class RetryTopicConfigurationBuilder {
@Nullable
private BinaryExceptionClassifierBuilder classifierBuilder;

private Map<String, Set<Class<? extends Throwable>>> dltRoutingRules = new HashMap<>();
private final Map<String, Set<Class<? extends Throwable>>> dltRoutingRules = new HashMap<>();

private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR;

Expand Down Expand Up @@ -539,7 +539,7 @@ private BinaryExceptionClassifierBuilder classifierBuilder() {
* @since 3.2.0
*/
public RetryTopicConfigurationBuilder dltRoutingRules(Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {
this.dltRoutingRules = dltRoutingRules;
this.dltRoutingRules.putAll(dltRoutingRules);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -220,12 +220,26 @@ void shouldGetNextDestinationTopic() {
}

@Test
void shouldGetDlt() {
void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() {
assertThat(defaultDestinationTopicContainer
.getDltFor("id", mainDestinationTopic.getDestinationName()))
.isEqualTo(dltDestinationTopic);
}

@Test
void shouldGetGeneralPurposeDltWhenThereIsNoCustomDltRegisteredForExceptionType() {
assertThat(defaultDestinationTopicContainer
.getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException()))
.isEqualTo(dltDestinationTopic);
}

@Test
void shouldGetCustomDltWhenThereIsCustomDltRegisteredForExceptionType() {
assertThat(defaultDestinationTopicContainer
.getDltFor("id", mainDestinationTopic.getDestinationName(), new DeserializationException(null, null, false, null)))
.isEqualTo(deserializationExcDltDestinationTopic);
}

@Test
void shouldThrowIfNoDestinationFound() {
assertThatNullPointerException().isThrownBy(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -374,11 +374,11 @@ static class RetryableTopicAnnotationFactoryWithCustomDltRouting {
@KafkaListener
@RetryableTopic(
attempts = "1",
exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = {
exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)
})
}
)
void listenWithRetry() {
// NoOps
Expand Down

0 comments on commit 797c010

Please sign in to comment.