Skip to content

Commit

Permalink
GH-2800: route messages to specialized dlt based on exception type
Browse files Browse the repository at this point in the history
  • Loading branch information
breader124 committed Dec 10, 2023
1 parent 36bf3e2 commit 95c5e5d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public DestinationTopic resolveDestinationTopic(String mainListenerId, String to
: destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e))
&& isNotFatalException(e)
&& !isPastTimout(originalTimestamp, destinationTopicHolder)
? resolveRetryDestination(destinationTopicHolder)
: getDltOrNoOpsDestination(mainListenerId, topic);
? resolveRetryDestination(mainListenerId, destinationTopicHolder, e)
: getDltOrNoOpsDestination(mainListenerId, topic, e);
}

private Boolean isNotFatalException(Exception e) {
Expand Down Expand Up @@ -128,10 +128,20 @@ && isNotFatalException(e)
}

@SuppressWarnings("deprecation")
private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) {
return (destinationTopicHolder.getSourceDestination().isReusableRetryTopic())
? destinationTopicHolder.getSourceDestination()
: destinationTopicHolder.getNextDestination();
private DestinationTopic resolveRetryDestination(String mainListenerId, DestinationTopicHolder destinationTopicHolder, Exception e) {
if (destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) {
return destinationTopicHolder.getSourceDestination();
}

if (isAlreadyDltDestination(destinationTopicHolder)) {
return getDltOrNoOpsDestination(mainListenerId, destinationTopicHolder.getSourceDestination().getDestinationName(), e);
}

return destinationTopicHolder.getNextDestination();
}

private static boolean isAlreadyDltDestination(DestinationTopicHolder destinationTopicHolder) {
return destinationTopicHolder.getNextDestination().isDltTopic();
}

@Override
Expand All @@ -144,18 +154,29 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String

@Nullable
@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName) {
DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName);
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e);
return destination.isNoOpsTopic()
? null
: destination;
}

private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic) {
private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) {
DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic);
return destination.isDltTopic() || destination.isNoOpsTopic()
? destination
: getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName());
return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ?
destination :
getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e);
}

private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) {
if (!destination.isDltTopic()) {
return false;
}

boolean isDltIntendedForCurrentExc = destination.usedForExceptions().stream()
.anyMatch(excType -> excType.isInstance(e));
boolean isGenericPurposeDlt = destination.usedForExceptions().isEmpty();
return isDltIntendedForCurrentExc || isGenericPurposeDlt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public boolean shouldRetryOn(Integer attempt, Throwable e) {
return this.properties.shouldRetryOn.test(attempt, e);
}

public Set<Class<? extends Throwable>> usedForExceptions() {
return Collections.unmodifiableSet(this.properties.usedForExceptions);
}

@Override
public String toString() {
return "DestinationTopic{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,6 +68,6 @@ public interface DestinationTopicContainer {
* @return The {@link DestinationTopic} instance corresponding to the DLT.
*/
@Nullable
DestinationTopic getDltFor(String mainListenerId, String topicName);
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,11 +75,12 @@ void shouldRegisterTopicDestinations() {
// then
assertThat(context.destinationsByTopicMap.containsKey(FIRST_TOPIC)).isTrue();
List<DestinationTopic> destinationTopicsForFirstTopic = context.destinationsByTopicMap.get(FIRST_TOPIC);
assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(4);
assertThat(destinationTopicsForFirstTopic.size()).isEqualTo(5);
assertThat(destinationTopicsForFirstTopic.get(0)).isEqualTo(mainDestinationTopic);
assertThat(destinationTopicsForFirstTopic.get(1)).isEqualTo(firstRetryDestinationTopic);
assertThat(destinationTopicsForFirstTopic.get(2)).isEqualTo(secondRetryDestinationTopic);
assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(dltDestinationTopic);
assertThat(destinationTopicsForFirstTopic.get(3)).isEqualTo(deserializationExcDltDestinationTopic);
assertThat(destinationTopicsForFirstTopic.get(4)).isEqualTo(dltDestinationTopic);

assertThat(context.destinationsByTopicMap.containsKey(SECOND_TOPIC)).isTrue();
List<DestinationTopic> destinationTopicsForSecondTopic = context.destinationsByTopicMap.get(SECOND_TOPIC);
Expand Down Expand Up @@ -143,7 +144,7 @@ void shouldCreateDestinationMapWhenProcessDestinations() {
.flatMap(list -> list.stream())
.collect(Collectors.toList());

assertThat(destinationList.size()).isEqualTo(11);
assertThat(destinationList.size()).isEqualTo(12);

assertThat(destinationList.contains(mainDestinationTopic)).isTrue();
assertThat(destinationList.contains(firstRetryDestinationTopic)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.retrytopic.DestinationTopic.Type;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;

/**
* @author Tomaz Fernandes
Expand Down Expand Up @@ -78,16 +79,16 @@ public void setup() {
void shouldResolveRetryDestination() {
assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", mainDestinationTopic.getDestinationName(), 1,
new RuntimeException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
new IllegalStateException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", firstRetryDestinationTopic.getDestinationName(), 1,
new RuntimeException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic);
new IllegalStateException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic);
assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(), 1,
new RuntimeException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic);
new IllegalStateException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic);
assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", dltDestinationTopic.getDestinationName(), 1,
new RuntimeException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic);
new IllegalStateException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic);

assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", mainDestinationTopic2.getDestinationName(), 1,
Expand Down Expand Up @@ -142,6 +143,18 @@ void shouldResolveDltDestinationForFatalDefaultException() {
.isEqualTo(dltDestinationTopic);
}

@Test
void shouldResolveDeserializationDltDestinationForDeserializationException() {
DeserializationException exc = new DeserializationException("", new byte[] {}, false, new IllegalStateException());

assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", secondRetryDestinationTopic.getDestinationName(),
1, exc, originalTimestamp)).isEqualTo(deserializationExcDltDestinationTopic);
assertThat(defaultDestinationTopicContainer
.resolveDestinationTopic("id", deserializationExcDltDestinationTopic.getDestinationName(),
1, exc, originalTimestamp)).isEqualTo(dltDestinationTopic);
}

@Test
void shouldResolveNoOpsForFatalDefaultExceptionInDlt() {
assertThat(defaultDestinationTopicContainer
Expand Down Expand Up @@ -207,7 +220,7 @@ void shouldGetNextDestinationTopic() {
@Test
void shouldGetDlt() {
assertThat(defaultDestinationTopicContainer
.getDltFor("id", mainDestinationTopic.getDestinationName()))
.getDltFor("id", mainDestinationTopic.getDestinationName(), new RuntimeException()))
.isEqualTo(dltDestinationTopic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiPredicate;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.BinaryExceptionClassifierBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.DeserializationException;

/**
* @author Tomaz Fernandes
Expand Down Expand Up @@ -72,12 +74,16 @@ public class DestinationTopicTests {
new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1,
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout);

protected DestinationTopic.Properties deserializationDltTopicProps =
new DestinationTopic.Properties(0, "-deserialization" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Set.of(DeserializationException.class));

protected DestinationTopic.Properties dltTopicProps =
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet());

protected List<DestinationTopic.Properties> allProps = Arrays
.asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps);
.asList(mainTopicProps, firstRetryProps, secondRetryProps, deserializationDltTopicProps, dltTopicProps);

protected DestinationTopic.Properties mainTopicProps2 =
new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1,
Expand Down Expand Up @@ -148,10 +154,12 @@ public class DestinationTopicTests {

protected PropsHolder secondRetryDestinationHolder = new PropsHolder(FIRST_TOPIC, secondRetryProps);

protected PropsHolder deserializationDltDestinationHolder = new PropsHolder(FIRST_TOPIC, deserializationDltTopicProps);

protected PropsHolder dltDestinationHolder = new PropsHolder(FIRST_TOPIC, dltTopicProps);

protected List<PropsHolder> allFirstDestinationsHolders = Arrays
.asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, dltDestinationHolder);
.asList(mainDestinationHolder, firstRetryDestinationHolder, secondRetryDestinationHolder, deserializationDltDestinationHolder, dltDestinationHolder);

protected final static String SECOND_TOPIC = "secondTopic";

Expand Down Expand Up @@ -206,12 +214,15 @@ public class DestinationTopicTests {
protected DestinationTopic dltDestinationTopic =
new DestinationTopic(FIRST_TOPIC + dltTopicProps.suffix(), dltTopicProps);

protected DestinationTopic deserializationExcDltDestinationTopic =
new DestinationTopic(FIRST_TOPIC + "-deserialization" + dltTopicProps.suffix(), deserializationDltTopicProps);

protected DestinationTopic noOpsDestinationTopic =
new DestinationTopic(dltDestinationTopic.getDestinationName() + "-noOps",
new DestinationTopic.Properties(dltTopicProps, "-noOps", DestinationTopic.Type.NO_OPS));

protected List<DestinationTopic> allFirstDestinationsTopics = Arrays
.asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, dltDestinationTopic);
.asList(mainDestinationTopic, firstRetryDestinationTopic, secondRetryDestinationTopic, deserializationExcDltDestinationTopic, dltDestinationTopic);

protected DestinationTopic mainDestinationTopic2 =
new DestinationTopic(SECOND_TOPIC + mainTopicProps2.suffix(), mainTopicProps2);
Expand Down

0 comments on commit 95c5e5d

Please sign in to comment.