Skip to content

Commit aa0b354

Browse files
committed
GH-2800: amendments after the code review
1 parent 2c1240e commit aa0b354

11 files changed

+109
-62
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@
2323
import java.lang.annotation.Target;
2424

2525
import org.springframework.kafka.retrytopic.DltStrategy;
26-
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
26+
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
2727
import org.springframework.kafka.retrytopic.RetryTopicConstants;
2828
import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy;
2929
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
@@ -178,7 +178,7 @@
178178
* exception thrown during the processing.
179179
* @return the exception based DLT routing
180180
*/
181-
ExceptionBasedDltRouting exceptionBasedDltRouting() default @ExceptionBasedDltRouting;
181+
ExceptionBasedDltDestination[] exceptionBasedDltRouting() default {};
182182

183183
/**
184184
* Whether the retry topics will be suffixed with the delay value for that topic or a

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.springframework.expression.spel.support.StandardEvaluationContext;
3939
import org.springframework.kafka.core.KafkaOperations;
4040
import org.springframework.kafka.retrytopic.ExceptionBasedDltDestination;
41-
import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting;
4241
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
4342
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
4443
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
@@ -215,8 +214,8 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, @N
215214
return policy;
216215
}
217216

218-
private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltRouting routingSpec) {
219-
return Arrays.stream(routingSpec.routingRules())
217+
private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnnotation(ExceptionBasedDltDestination[] routingRules) {
218+
return Arrays.stream(routingRules)
220219
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
221220
}
222221

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -153,6 +153,11 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String
153153
() -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination();
154154
}
155155

156+
@Override
157+
public DestinationTopic getDltFor(String mainListenerId, String topicName) {
158+
return getDltFor(mainListenerId, topicName, null);
159+
}
160+
156161
@Nullable
157162
@Override
158163
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
@@ -181,6 +186,10 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio
181186
}
182187

183188
private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable> excType) {
189+
if (e == null) {
190+
return false;
191+
}
192+
184193
Throwable toMatch = e;
185194

186195
boolean isMatched = excType.isInstance(toMatch);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -196,9 +196,32 @@ public Properties(Properties sourceProperties, String suffix, Type type) {
196196
* @param shouldRetryOn the exception classifications.
197197
* @param timeout the timeout.
198198
* @param autoStartDltHandler whether or not to start the DLT handler.
199-
* @param usedForExceptions the exceptions which destination is intended for
200199
* @since 2.8
201200
*/
201+
public Properties(long delayMs, String suffix, Type type,
202+
int maxAttempts, int numPartitions,
203+
DltStrategy dltStrategy,
204+
KafkaOperations<?, ?> kafkaOperations,
205+
BiPredicate<Integer, Throwable> shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) {
206+
this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn,
207+
timeout, autoStartDltHandler, Collections.emptySet());
208+
}
209+
210+
/**
211+
* Create an instance with the provided properties.
212+
* @param delayMs the delay in ms.
213+
* @param suffix the suffix.
214+
* @param type the type.
215+
* @param maxAttempts the max attempts.
216+
* @param numPartitions the number of partitions.
217+
* @param dltStrategy the DLT strategy.
218+
* @param kafkaOperations the {@link KafkaOperations}.
219+
* @param shouldRetryOn the exception classifications.
220+
* @param timeout the timeout.
221+
* @param autoStartDltHandler whether or not to start the DLT handler.
222+
* @param usedForExceptions the exceptions which destination is intended for
223+
* @since 3.2
224+
*/
202225
public Properties(long delayMs, String suffix, Type type,
203226
int maxAttempts, int numPartitions,
204227
DltStrategy dltStrategy,

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,10 +66,24 @@ public interface DestinationTopicContainer {
6666
* DLT for the given topic, or null if none is found.
6767
* @param mainListenerId the listener id.
6868
* @param topicName the topic name for which to look the DLT for
69-
* @param exc the exception which is being handled
7069
* @return The {@link DestinationTopic} instance corresponding to the DLT.
70+
* @deprecated Replaced by {@link #getDltFor(String, String, Exception)}
7171
*/
7272
@Nullable
73-
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc);
73+
@Deprecated(since = "3.2", forRemoval = true)
74+
DestinationTopic getDltFor(String mainListenerId, String topicName);
7475

76+
/**
77+
* Returns the {@link DestinationTopic} instance registered as
78+
* DLT for the given topic taking into consideration the exception
79+
* thrown, or null if none is found.
80+
* @param mainListenerId the listener id.
81+
* @param topicName the topic name for which to look the DLT for
82+
* @param exc the exception which is being handled
83+
* @return The {@link DestinationTopic} instance corresponding to the DLT.
84+
*/
85+
@Nullable
86+
default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) {
87+
return null;
88+
}
7589
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,9 +85,35 @@ public class DestinationTopicPropertiesFactory {
8585
* @param topicSuffixingStrategy the topic suffixing strategy.
8686
* @param sameIntervalTopicReuseStrategy the same interval reuse strategy.
8787
* @param timeout the timeout.
88-
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
8988
* @since 3.0.12
9089
*/
90+
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
91+
BinaryExceptionClassifier exceptionClassifier,
92+
int numPartitions, KafkaOperations<?, ?> kafkaOperations,
93+
DltStrategy dltStrategy,
94+
TopicSuffixingStrategy topicSuffixingStrategy,
95+
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy,
96+
long timeout) {
97+
98+
this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations,
99+
dltStrategy, topicSuffixingStrategy, sameIntervalTopicReuseStrategy, timeout, Collections.emptyMap());
100+
}
101+
102+
/**
103+
* Construct an instance with the provided properties.
104+
* @param retryTopicSuffix the suffix.
105+
* @param dltSuffix the dlt suffix.
106+
* @param backOffValues the back off values.
107+
* @param exceptionClassifier the exception classifier.
108+
* @param numPartitions the number of partitions.
109+
* @param kafkaOperations the operations.
110+
* @param dltStrategy the dlt strategy.
111+
* @param topicSuffixingStrategy the topic suffixing strategy.
112+
* @param sameIntervalTopicReuseStrategy the same interval reuse strategy.
113+
* @param timeout the timeout.
114+
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
115+
* @since 3.2.0
116+
*/
91117
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
92118
BinaryExceptionClassifier exceptionClassifier,
93119
int numPartitions, KafkaOperations<?, ?> kafkaOperations,

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltDestination.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
2121
* processing caused the configured exception to be thrown.
2222
*
2323
* @author Adrian Chlebosz
24-
* @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting
24+
*
2525
* @since 3.2.0
2626
*/
2727
public @interface ExceptionBasedDltDestination {
@@ -31,7 +31,6 @@
3131
* before the main suffix configured through the
3232
* ${@link org.springframework.kafka.annotation.RetryableTopic#dltTopicSuffix()}, so the
3333
* final name is the product of these two.
34-
*
3534
* @return the configured suffix extension
3635
*/
3736
String suffix();
@@ -41,7 +40,6 @@
4140
* it should be eventually redirected to the DLT with name containing the extension
4241
* configured through {@link #suffix()}. The causes of the thrown exception will be
4342
* traversed to match with any of configured ones.
44-
*
4543
* @return configured exceptions
4644
*/
4745
Class<? extends Throwable>[] exceptions();

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,7 +79,7 @@ public class RetryTopicConfigurationBuilder {
7979
@Nullable
8080
private BinaryExceptionClassifierBuilder classifierBuilder;
8181

82-
private Map<String, Set<Class<? extends Throwable>>> dltRoutingRules = new HashMap<>();
82+
private final Map<String, Set<Class<? extends Throwable>>> dltRoutingRules = new HashMap<>();
8383

8484
private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR;
8585

@@ -539,7 +539,7 @@ private BinaryExceptionClassifierBuilder classifierBuilder() {
539539
* @since 3.2.0
540540
*/
541541
public RetryTopicConfigurationBuilder dltRoutingRules(Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {
542-
this.dltRoutingRules = dltRoutingRules;
542+
this.dltRoutingRules.putAll(dltRoutingRules);
543543
return this;
544544
}
545545

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -220,12 +220,26 @@ void shouldGetNextDestinationTopic() {
220220
}
221221

222222
@Test
223-
void shouldGetDlt() {
223+
void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() {
224+
assertThat(defaultDestinationTopicContainer
225+
.getDltFor("id", mainDestinationTopic.getDestinationName()))
226+
.isEqualTo(dltDestinationTopic);
227+
}
228+
229+
@Test
230+
void shouldGetGeneralPurposeDltWhenThereIsNoCustomDltRegisteredForExceptionType() {
224231
assertThat(defaultDestinationTopicContainer
225232
.getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException()))
226233
.isEqualTo(dltDestinationTopic);
227234
}
228235

236+
@Test
237+
void shouldGetCustomDltWhenThereIsCustomDltRegisteredForExceptionType() {
238+
assertThat(defaultDestinationTopicContainer
239+
.getDltFor("id", mainDestinationTopic.getDestinationName(), new DeserializationException(null, null, false, null)))
240+
.isEqualTo(deserializationExcDltDestinationTopic);
241+
}
242+
229243
@Test
230244
void shouldThrowIfNoDestinationFound() {
231245
assertThatNullPointerException().isThrownBy(

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -374,11 +374,11 @@ static class RetryableTopicAnnotationFactoryWithCustomDltRouting {
374374
@KafkaListener
375375
@RetryableTopic(
376376
attempts = "1",
377-
exceptionBasedDltRouting = @ExceptionBasedDltRouting(routingRules = {
377+
exceptionBasedDltRouting = {
378378
@ExceptionBasedDltDestination(
379379
suffix = "-deserialization", exceptions = {DeserializationException.class}
380380
)
381-
})
381+
}
382382
)
383383
void listenWithRetry() {
384384
// NoOps

0 commit comments

Comments
 (0)