diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 462e63b40d..1119bf39e9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,10 @@ package org.springframework.kafka.retrytopic; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.BiPredicate; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.core.KafkaOperations; @@ -36,6 +35,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author João Lima + * @author Wang Zhiyang * @since 2.7 * */ @@ -47,19 +47,21 @@ public class DestinationTopicPropertiesFactory { private final List backOffValues; - private final BinaryExceptionClassifier exceptionClassifier; - private final int numPartitions; private final int maxAttempts; - private final KafkaOperations kafkaOperations; + private final boolean isSameIntervalReuse; - private final DltStrategy dltStrategy; + private final boolean isFixedDelay; - private final TopicSuffixingStrategy topicSuffixingStrategy; + private final int retryTopicsAmount; - private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy; + private final BiPredicate shouldRetryOn; + + private final KafkaOperations kafkaOperations; + + private final DltStrategy dltStrategy; private final long timeout; @@ -90,15 +92,19 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; - this.exceptionClassifier = exceptionClassifier; this.numPartitions = numPartitions; - this.topicSuffixingStrategy = topicSuffixingStrategy; - this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; - // Max Attempts include the initial try. - this.maxAttempts = this.backOffValues.size() + 1; + int backOffValuesSize = this.backOffValues.size(); + this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); + this.isFixedDelay = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(topicSuffixingStrategy) + || backOffValuesSize > 1 && backOffValues.stream().distinct().count() == 1; + // Max Attempts to include the initial try. + this.maxAttempts = backOffValuesSize + 1; + this.shouldRetryOn = (attempt, throwable) -> attempt < this.maxAttempts + && exceptionClassifier.classify(throwable); + this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts(); } /** @@ -113,71 +119,20 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a } public List createProperties() { - return isSingleTopicFixedDelay() - ? createPropertiesForFixedDelaySingleTopic() - : createPropertiesForDefaultTopicStrategy(); - } - - private List createPropertiesForFixedDelaySingleTopic() { - return isNoDltStrategy() - ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn())) - : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn()), - createDltProperties()); - } - - private boolean isSingleTopicFixedDelay() { - return (this.backOffValues.size() == 1 || isFixedDelay()) && isSingleTopicSameIntervalTopicReuseStrategy(); - } - - private boolean isSingleTopicSameIntervalTopicReuseStrategy() { - return SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(this.sameIntervalTopicReuseStrategy); - } - - private List createPropertiesForDefaultTopicStrategy() { - - int retryTopicsAmount = retryTopicsAmount(); - - return IntStream.rangeClosed(0, isNoDltStrategy() - ? retryTopicsAmount - : retryTopicsAmount + 1) - .mapToObj(this::createTopicProperties) - .collect(Collectors.toList()); - } - - int retryTopicsAmount() { - return this.backOffValues.size() - reusableTopicAttempts(); - } - - private int reusableTopicAttempts() { - return this.backOffValues.size() > 0 - ? !isFixedDelay() - ? isSingleTopicSameIntervalTopicReuseStrategy() - // Assuming that duplicates are always in - // the end of the list. - ? amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1 - : 0 - : 0 - : 0; - } - - private boolean isNoDltStrategy() { - return DltStrategy.NO_DLT.equals(this.dltStrategy); - } - - private DestinationTopic.Properties createTopicProperties(int index) { - BiPredicate shouldRetryOn = getShouldRetryOn(); - return index == 0 - ? createMainTopicProperties() - : (index <= this.retryTopicsAmount()) - ? createRetryProperties(index, shouldRetryOn) - : createDltProperties(); + List list = new ArrayList<>(this.retryTopicsAmount + 2); + list.add(createMainTopicProperties()); + for (int backOffIndex = 0; backOffIndex < this.retryTopicsAmount; backOffIndex++) { + list.add(createRetryProperties(backOffIndex)); + } + if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) { + list.add(createDltProperties()); + } + return Collections.unmodifiableList(list); } private DestinationTopic.Properties createMainTopicProperties() { return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts, - this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout); + this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, this.timeout); } private DestinationTopic.Properties createDltProperties() { @@ -186,49 +141,37 @@ private DestinationTopic.Properties createDltProperties() { this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); } - private BiPredicate getShouldRetryOn() { - return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable); + private DestinationTopic.Properties createRetryProperties(int backOffIndex) { + long thisBackOffValue = this.backOffValues.get(backOffIndex); + return createProperties(thisBackOffValue, getTopicSuffix(backOffIndex, thisBackOffValue)); } - private DestinationTopic.Properties createRetryProperties(int index, - BiPredicate shouldRetryOn) { - - int indexInBackoffValues = index - 1; - Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue) - ? Type.REUSABLE_RETRY_TOPIC - : Type.RETRY; - return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, - getTopicSuffix(indexInBackoffValues, thisBackOffValue)); - } - - private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) { - return isSingleTopicFixedDelay() - ? this.destinationTopicSuffixes.getRetrySuffix() - : isSuffixWithIndexStrategy() || isFixedDelay() - ? joinWithRetrySuffix(indexInBackoffValues) - : hasDuplicates(thisBackOffValue) - ? joinWithRetrySuffix(thisBackOffValue) - .concat(suffixForRepeatedInterval(indexInBackoffValues, thisBackOffValue)) - : joinWithRetrySuffix(thisBackOffValue); - } - - private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBackOffValue) { - return isSingleTopicSameIntervalTopicReuseStrategy() - ? "" - : "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue); - } - - private boolean isDelayWithReusedTopic(Long backoffValue) { - return hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy(); + private String getTopicSuffix(int backOffIndex, long thisBackOffValue) { + if (this.isSameIntervalReuse && this.retryTopicsAmount == 1) { + return this.destinationTopicSuffixes.getRetrySuffix(); + } + else if (this.isFixedDelay) { + return joinWithRetrySuffix(backOffIndex); + } + else { + String retrySuffix = joinWithRetrySuffix(thisBackOffValue); + if (!this.isSameIntervalReuse && hasDuplicates(thisBackOffValue)) { + return retrySuffix.concat("-" + (backOffIndex - this.backOffValues.indexOf(thisBackOffValue))); + } + return retrySuffix; + } } - private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { - return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue); + private DestinationTopic.Type getDestinationTopicType(Long backOffValue) { + return this.isSameIntervalReuse && hasDuplicates(backOffValue) ? Type.REUSABLE_RETRY_TOPIC : Type.RETRY; } - private boolean isSuffixWithIndexStrategy() { - return TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(this.topicSuffixingStrategy); + private int reusableTopicAttempts() { + if (this.isSameIntervalReuse && this.backOffValues.size() > 1) { + // Assuming that duplicates are always at the end of the list. + return amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1; + } + return 0; } private boolean hasDuplicates(Long thisBackOffValue) { @@ -238,22 +181,14 @@ private boolean hasDuplicates(Long thisBackOffValue) { private int amountOfDuplicates(Long thisBackOffValue) { return Long.valueOf(this.backOffValues .stream() - .filter(value -> value.equals(thisBackOffValue)) - .count()).intValue(); - } - - private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType, - BiPredicate shouldRetryOn, - int indexInBackoffValues, - String suffix) { - return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix, - topicType, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, shouldRetryOn, this.timeout); + .filter(thisBackOffValue::equals) + .count()) + .intValue(); } - private boolean isFixedDelay() { - // If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy - return this.backOffValues.size() > 1 && this.backOffValues.stream().distinct().count() == 1; + private DestinationTopic.Properties createProperties(long delayMs, String suffix) { + return new DestinationTopic.Properties(delayMs, suffix, getDestinationTopicType(delayMs), this.maxAttempts, + this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, this.timeout); } private String joinWithRetrySuffix(long parameter) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index e83bcec19f..82ea3fc5f6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +37,7 @@ /** * @author Tomaz Fernandes + * @author Wang Zhiyang * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -124,7 +124,6 @@ private void assertDltTopic(DestinationTopic.Properties dltProperties) { } @Test - @SuppressWarnings("deprecation") void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { // when ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); @@ -143,7 +142,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 4).isTrue(); @@ -197,7 +196,6 @@ void shouldNotCreateDltProperties() { } @Test - @SuppressWarnings("deprecation") void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { // when @@ -216,7 +214,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 3).isTrue(); @@ -239,7 +237,6 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse } @Test - @SuppressWarnings("deprecation") void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // when @@ -258,7 +255,7 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 4).isTrue(); @@ -288,7 +285,6 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // setup @@ -305,11 +301,10 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // then IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) - .isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + .isEqualTo(retryTopicSuffix + "-" + (index - 1))); } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // setup @@ -328,11 +323,10 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // then IntStream.range(1, maxAttempts) .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + - "-" + String.valueOf(index - 1))); + "-" + (index - 1))); } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // setup @@ -351,7 +345,6 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount() == 4).isTrue(); assertThat(propertiesList.size() == 6).isTrue(); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); assertThat(propertiesList.get(1).suffix()).isEqualTo(retryTopicSuffix + "-1000"); @@ -362,7 +355,6 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { } @Test - @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // setup @@ -381,17 +373,15 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @Test - @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // setup @@ -410,17 +400,15 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @Test - @SuppressWarnings("deprecation") void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // setup @@ -435,21 +423,18 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(5); assertThat(propertiesList.size()).isEqualTo(7); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false, false); - assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false, false); - assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false); + assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false); + assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false); assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); } - @SuppressWarnings("deprecation") private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, - Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, - boolean expectedIsSingleTopicRetry) { + Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic) { assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); assertThat(topicProperties.isRetryTopic()).isTrue(); DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties);