diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java index 6f5c321931..a4db2790c6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java @@ -23,16 +23,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -76,8 +73,6 @@ @DirtiesContext @EmbeddedKafka @TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) -@DisabledIfEnvironmentVariable(named = "GITHUB_ACTION", matches = ".*?", - disabledReason = "Fails sporadically. Perhaps uses too much Apache Kafka resources") public class AsyncCompletableFutureRetryTopicScenarioTests { private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; @@ -107,7 +102,7 @@ public class AsyncCompletableFutureRetryTopicScenarioTests { @Test void allFailCaseTest( - @Autowired TestTopicListener0 zeroTopicListener, + @Autowired TestTopicListener0 testTopicListener, @Autowired MyCustomDltProcessor myCustomDltProcessor0) { // All Fail case. String shortFailedMsg1 = "0"; @@ -126,12 +121,6 @@ void allFailCaseTest( shortFailedMsg1, shortFailedMsg2, shortFailedMsg3, - shortFailedMsg1, - shortFailedMsg2, - shortFailedMsg3, - shortFailedMsg1, - shortFailedMsg2, - shortFailedMsg3 }; String[] expectedReceivedTopics = { TEST_TOPIC0, @@ -143,12 +132,6 @@ void allFailCaseTest( expectedRetryTopic, expectedRetryTopic, expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic }; String[] expectedDltMsgs = { shortFailedMsg1, @@ -167,8 +150,8 @@ void allFailCaseTest( assertThat(destinationTopic.getDestinationName()).isEqualTo(TEST_TOPIC0 + "-retry"); - assertThat(zeroTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); - assertThat(zeroTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); assertThat(myCustomDltProcessor0.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @@ -177,9 +160,18 @@ void allFailCaseTest( void firstShortFailAndLastLongSuccessRetryTest( @Autowired TestTopicListener1 testTopicListener1, @Autowired MyCustomDltProcessor myCustomDltProcessor1) { + // Scenario. + // 1. Short Fail msg (offset 0) + // 2. Long success msg (offset 1) -> -ing (latch wait) + // 3. Short fail msg (Retry1 offset 0) -> (latch down) + // 4. Long success msg (offset 1) -> Success! + // 5. Short fail msg (Retry2 offset 0) + // 6. Short fail msg (Retry3 offset 0) + // 7. Short fail msg (Retry4 offset 0) + // Given - String longSuccessMsg = "3"; - String shortFailedMsg = "1"; + String longSuccessMsg = testTopicListener1.LONG_SUCCESS_MSG; + String shortFailedMsg = testTopicListener1.SHORT_FAIL_MSG; DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("1-topicId", TEST_TOPIC1); String expectedRetryTopic = TEST_TOPIC1 + "-retry"; @@ -205,7 +197,6 @@ void firstShortFailAndLastLongSuccessRetryTest( shortFailedMsg }; - // When kafkaTemplate.send(TEST_TOPIC1, shortFailedMsg); kafkaTemplate.send(TEST_TOPIC1, longSuccessMsg); @@ -217,17 +208,28 @@ void firstShortFailAndLastLongSuccessRetryTest( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(testTopicListener1.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(testTopicListener1.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener1.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor1.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @Test void firstLongSuccessAndLastShortFailed( - @Autowired TestTopicListener2 zero2TopicListener, + @Autowired TestTopicListener2 testTopicListener2, @Autowired MyCustomDltProcessor myCustomDltProcessor2) { + // Scenario. + // 1. Long success msg (offset 0) -> going on... (latch await) + // 2. Short fail msg (offset 1) -> done. + // 3. Short fail msg (Retry1 offset 1) -> done (latch down) + // 4. Long success msg (offset 0) -> succeed. + // 5. Short fail msg (Retry2 offset 1) + // 6. Short fail msg (Retry3 offset 1) + // 7. Short fail msg (Retry4 offset 1) + // 8. Short fail msg (dlt offset 1) + // Given - String shortFailedMsg = "1"; - String longSuccessMsg = "3"; + String shortFailedMsg = testTopicListener2.SHORT_FAIL_MSG; + String longSuccessMsg = testTopicListener2.LONG_SUCCESS_MSG; DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("2-topicId", TEST_TOPIC2); String expectedRetryTopic = TEST_TOPIC2 + "-retry"; @@ -263,34 +265,59 @@ void firstLongSuccessAndLastShortFailed( assertThat(awaitLatch(latchContainer.dltCountdownLatch2)).isTrue(); assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); - assertThat(zero2TopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); - assertThat(zero2TopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener2.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener2.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener2.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor2.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @Test - void longFailMsgTwiceThenShortSucessMsgThird( + void longFailMsgTwiceThenShortSuccessMsgThird( @Autowired TestTopicListener3 testTopicListener3, @Autowired MyCustomDltProcessor myCustomDltProcessor3) { + // Scenario + // 1. Long fail msg arrived (offset 0) -> -ing (wait latch offset 4) + // 2. Long fail msg arrived (offset 1) -> -ing (wait latch offset 1) + // 3. Short success msg arrived (offset 2) -> done + // 4. Short success msg arrived (offset 3) -> done + // 5. Short success msg arrived (offset 4) -> done (latch offset 4 count down) + // 6. Long fail msg throws error (offset 0) -> done + // 7. Long fail msg throws error (offset 1) -> done + // 8. Long fail msg (retry 1 with offset 0) -> done + // 9. Long fail msg (retry 1 with offset 1) -> done + // 10. Long fail msg (retry 2 with offset 0) -> done + // 11. Long fail msg (retry 2 with offset 1) -> done + // 12. Long fail msg (retry 3 with offset 0) -> done + // 13. Long fail msg (retry 3 with offset 1) -> done + // 14. Long fail msg (retry 4 with offset 0) -> done + // 15. Long fail msg (retry 4 with offset 1) -> done + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("3-topicId", TEST_TOPIC3); + String firstMsg = TestTopicListener3.FAIL_PREFIX + "0"; + String secondMsg = TestTopicListener3.FAIL_PREFIX + "1"; + String thirdMsg = TestTopicListener3.SUCCESS_PREFIX + "2"; + String fourthMsg = TestTopicListener3.SUCCESS_PREFIX + "3"; + String fifthMsg = TestTopicListener3.SUCCESS_PREFIX + "4"; + String expectedRetryTopic = TEST_TOPIC3 + "-retry"; + String[] expectedReceivedMsgs = { - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, + firstMsg, + secondMsg, + thirdMsg, + fourthMsg, + fifthMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, }; String[] expectedReceivedTopics = { @@ -310,16 +337,16 @@ void longFailMsgTwiceThenShortSucessMsgThird( }; String[] expectedDltMsgs = { - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, + firstMsg, + secondMsg, }; // When - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, firstMsg); + kafkaTemplate.send(TEST_TOPIC3, secondMsg); + kafkaTemplate.send(TEST_TOPIC3, thirdMsg); + kafkaTemplate.send(TEST_TOPIC3, fourthMsg); + kafkaTemplate.send(TEST_TOPIC3, fifthMsg); // Then assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); @@ -328,6 +355,7 @@ void longFailMsgTwiceThenShortSucessMsgThird( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(testTopicListener3.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(testTopicListener3.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener3.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor3.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @@ -336,6 +364,22 @@ void longFailMsgTwiceThenShortSucessMsgThird( void longSuccessMsgTwiceThenShortFailMsgTwice( @Autowired TestTopicListener4 topicListener4, @Autowired MyCustomDltProcessor myCustomDltProcessor4) { + // Scenario + // 1. Msg arrived (offset 0) -> -ing + // 2. Msg arrived (offset 1) -> -ing + // 3. Msg arrived (offset 2) throws error -> done + // 4. Msg arrived (offset 3) throws error -> done + // 5. Msg arrived (offset 0) succeed -> done + // 6. Msg arrived (offset 1) succeed -> done + // 7. Msg arrived (retry 1, offset 2) -> done + // 8. Msg arrived (retry 1, offset 3) -> done + // 9. Msg arrived (retry 2, offset 2) -> done + // 10. Msg arrived (retry 2, offset 3) -> done + // 11. Msg arrived (retry 3, offset 2) -> done + // 12. Msg arrived (retry 3, offset 3) -> done + // 13. Msg arrived (retry 4, offset 2) -> done + // 14. Msg arrived (retry 4, offset 3) -> done + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("4-TopicId", TEST_TOPIC4); @@ -388,45 +432,58 @@ void longSuccessMsgTwiceThenShortFailMsgTwice( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(topicListener4.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(topicListener4.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(topicListener4.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor4.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @Test - void oneLongSuccessMsgBetween100ShortFailMsg( + void oneLongSuccessMsgBetween49ShortFailMsg( @Autowired TestTopicListener5 topicListener5, @Autowired MyCustomDltProcessor myCustomDltProcessor5) { + // Scenario. + // 1. msgs received (offsets 0 ~ 24) -> failed. + // 2. msgs received (offset 25) -> -ing + // 3. msgs received (offset 26 ~ 49) -> failed. + // 4. msgs succeed (offset 50) -> done + // 5. msgs received (Retry1 offset 0 ~ 49 except 25) -> failed. + // 6. msgs received (Retry2 offset 0 ~ 49 except 25) -> failed. + // 7. msgs received (Retry3 offset 0 ~ 49 except 25) -> failed. + // 8. msgs received (Retry4 offset 0 ~ 49 except 25) -> failed. + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("5-TopicId", TEST_TOPIC5); String expectedRetryTopic = TEST_TOPIC5 + "-retry"; - String[] expectedReceivedMsgs = new String[501]; - for (int i = 0; i < 500; i++) { + String[] expectedReceivedMsgs = new String[148]; + for (int i = 0; i < 147; i++) { expectedReceivedMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; } - expectedReceivedMsgs[500] = TestTopicListener5.LONG_SUCCESS_MSG; + expectedReceivedMsgs[147] = TestTopicListener5.LONG_SUCCESS_MSG; - String[] expectedReceivedTopics = new String[501]; - for (int i = 0; i < 100; i++) { + String[] expectedReceivedTopics = new String[148]; + for (int i = 0; i < 49; i++) { expectedReceivedTopics[i] = TEST_TOPIC5; } - for (int i = 100; i < 500; i++) { + for (int i = 49; i < 147; i++) { expectedReceivedTopics[i] = expectedRetryTopic; } - expectedReceivedTopics[500] = TEST_TOPIC5; + expectedReceivedTopics[147] = TEST_TOPIC5; - String[] expectedDltMsgs = new String[100]; - for (int i = 0; i < 100; i++) { + String[] expectedDltMsgs = new String[49]; + for (int i = 0; i < 49; i++) { expectedDltMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; } // When - for (int i = 0; i < 100; i++) { - kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); - if (i == 50) { + for (int i = 0; i < 50; i++) { + if (i != 25) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); + } + else { kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.LONG_SUCCESS_MSG); } } @@ -443,76 +500,80 @@ void oneLongSuccessMsgBetween100ShortFailMsg( } @Test - void halfSuccessMsgAndHalfFailedMsgWithRandomSleepTime( + void moreComplexAsyncScenarioTest( @Autowired TestTopicListener6 topicListener6, @Autowired @Qualifier("myCustomDltProcessor6") MyCustomDltProcessor myCustomDltProcessor6) { + // Scenario. + // 1. Fail Msg (offset 0) -> -ing + // 2. Success Msg (offset 1) -> -ing + // 3. Success Msg (offset 2) -> -ing + // 4. Fail Msg (offset 3) -> done + // 5. Success Msg (offset 4) -> -ing + // 6. Success msg succeed (offset 2) - done + // 7. Success msg succeed (offset 4) -> done + // 8. Fail Msg (Retry1 offset 3) -> done + // 9. Fail Msg (Retry2 offset 3) -> done + // 10. Success msg succeed (offset 1) -> done + // 11. Fail Msg (offset 0) -> done + // 12. Fail Msg (Retry 1 offset 0) -> done + // 13. Fail Msg (Retry 2 offset 0) -> done // Given - DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); + String firstMsg = TestTopicListener6.FAIL_PREFIX + "0"; + String secondMsg = TestTopicListener6.SUCCESS_PREFIX + "1"; + String thirdMsg = TestTopicListener6.SUCCESS_PREFIX + "2"; + String fourthMsg = TestTopicListener6.FAIL_PREFIX + "3"; + String fifthMsg = TestTopicListener6.SUCCESS_PREFIX + "4"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); String expectedRetryTopic = TEST_TOPIC6 + "-retry"; - Random random = new Random(); - ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(); - - for (int i = 0; i < 50; i++) { - int randomSleepAWhile = random.nextInt(1, 100); - String msg = randomSleepAWhile + TestTopicListener6.SUCCESS_SUFFIX; - q.add(msg); - } - - for (int i = 0; i < 50; i++) { - int randomSleepAWhile = random.nextInt(1, 100); - String msg = randomSleepAWhile + TestTopicListener6.FAIL_SUFFIX; - q.add(msg); - } - - int expectedSuccessMsgCount = 50; - int expectedFailedMsgCount = 250; + String[] expectedReceivedMsgs = { + firstMsg, + secondMsg, + thirdMsg, + fourthMsg, + fifthMsg, + fourthMsg, + fourthMsg, + firstMsg, + firstMsg + }; - int expectedReceivedOriginalTopicCount = 100; - int expectedReceivedRetryTopicCount = 200; - int expectedReceivedDltMsgCount = 50; + String[] expectedReceivedTopics = { + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + String[] expectedDltMsgs = { + TestTopicListener6.FAIL_PREFIX + "3", + TestTopicListener6.FAIL_PREFIX + "0" + }; // When - while (!q.isEmpty()) { - String successOrFailMsg = q.poll(); - kafkaTemplate.send(TEST_TOPIC6, successOrFailMsg); - } + kafkaTemplate.send(TEST_TOPIC6, firstMsg); + kafkaTemplate.send(TEST_TOPIC6, secondMsg); + kafkaTemplate.send(TEST_TOPIC6, thirdMsg); + kafkaTemplate.send(TEST_TOPIC6, fourthMsg); + kafkaTemplate.send(TEST_TOPIC6, fifthMsg); // Then assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); assertThat(awaitLatch(latchContainer.dltCountdownLatch6)).isTrue(); assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener6.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener6.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(topicListener6.latchWaitFailCount).isEqualTo(0); - long actualReceivedSuccessMsgCount = topicListener6.receivedMsgs.stream() - .map(s -> s.split(",")[1]) - .filter(m -> (',' + m).equals(TestTopicListener6.SUCCESS_SUFFIX)) - .count(); - - long actualReceivedFailedMsgCount = topicListener6.receivedMsgs.stream() - .map(s -> s.split(",")[1]) - .filter(m -> (',' + m).equals( - TestTopicListener6.FAIL_SUFFIX)) - .count(); - - - long actualReceivedOriginalTopicMsgCount = topicListener6.receivedTopics.stream() - .filter(topic -> topic.equals(TEST_TOPIC6)) - .count(); - - long actualReceivedRetryTopicMsgCount = topicListener6.receivedTopics.stream() - .filter(topic -> topic.equals(expectedRetryTopic)) - .count(); - - assertThat(actualReceivedSuccessMsgCount).isEqualTo(expectedSuccessMsgCount); - assertThat(actualReceivedFailedMsgCount).isEqualTo(expectedFailedMsgCount); - assertThat(actualReceivedOriginalTopicMsgCount).isEqualTo(expectedReceivedOriginalTopicCount); - assertThat(actualReceivedRetryTopicMsgCount).isEqualTo(expectedReceivedRetryTopicCount); - - assertThat(myCustomDltProcessor6.receivedMsg.size()).isEqualTo(expectedReceivedDltMsgCount); + assertThat(myCustomDltProcessor6.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } private boolean awaitLatch(CountDownLatch latch) { @@ -553,7 +614,7 @@ public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEI finally { container.countDownLatch0.countDown(); } - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } } @@ -574,22 +635,43 @@ static class TestTopicListener1 { private final List receivedTopics = new ArrayList<>(); + private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1); + + protected final String LONG_SUCCESS_MSG = "success"; + + protected final String SHORT_FAIL_MSG = "fail"; + + protected int latchWaitFailCount = 0; + @KafkaHandler - public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { - this.receivedMsgs.add(message); + public CompletableFuture listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedTopics.add(receivedTopic); + this.receivedMsgs.add(message); return CompletableFuture.supplyAsync(() -> { try { - if (message.equals("1")) { + if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + firstRetryFailMsgLatch.await(10, TimeUnit.SECONDS); + } + } + catch (InterruptedException e) { + latchWaitFailCount += 1; + throw new RuntimeException(e); } finally { + if (receivedTopic.equals(TEST_TOPIC1 + "-retry") && + offset.equals("0")) { + firstRetryFailMsgLatch.countDown(); + } container.countDownLatch1.countDown(); } - return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } } @@ -610,24 +692,47 @@ static class TestTopicListener2 { private final List receivedTopics = new ArrayList<>(); + private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1); + + protected final String LONG_SUCCESS_MSG = "success"; + + protected final String SHORT_FAIL_MSG = "fail"; + + protected int latchWaitFailCount = 0; + @KafkaHandler - public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public CompletableFuture listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return CompletableFuture.supplyAsync(() -> { try { - if (message.equals("1")) { + if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + firstRetryFailMsgLatch.await(10, TimeUnit.SECONDS); + } + } + catch (InterruptedException e) { + latchWaitFailCount += 1; + throw new RuntimeException(e); } finally { + if (receivedTopic.equals(TEST_TOPIC2 + "-retry") && + offset.equals("1")) { + firstRetryFailMsgLatch.countDown(); + } container.countDownLatch2.countDown(); } return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } + } @KafkaListener( @@ -646,28 +751,53 @@ static class TestTopicListener3 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_FAIL_MSG = "100"; + public static final String FAIL_PREFIX = "fail"; + + public static final String SUCCESS_PREFIX = "success"; + + private CountDownLatch successLatchCount = new CountDownLatch(3); + + private CountDownLatch offset0Latch = new CountDownLatch(1); - public static final String SHORT_SUCCESS_MSG = "1"; + protected int latchWaitFailCount = 0; @KafkaHandler - public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public CompletableFuture listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC)String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return CompletableFuture.supplyAsync(() -> { try { - if (message.equals(LONG_FAIL_MSG)) { + if (message.startsWith(FAIL_PREFIX)) { + if (receivedTopic.equals(TEST_TOPIC3)) { + if (offset.equals("0")) { + successLatchCount.await(10, TimeUnit.SECONDS); + offset0Latch.countDown(); + } + if (offset.equals("1")) { + offset0Latch.await(10, TimeUnit.SECONDS); + } + } throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + successLatchCount.countDown(); + } + } + catch (InterruptedException e) { + latchWaitFailCount += 1; + throw new RuntimeException(e); } finally { container.countDownLatch3.countDown(); } - return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } + } @KafkaListener( @@ -686,13 +816,20 @@ static class TestTopicListener4 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_SUCCESS_MSG = "100"; + public static final String LONG_SUCCESS_MSG = "success"; - public static final String SHORT_FAIL_MSG = "1"; + public static final String SHORT_FAIL_MSG = "fail"; + + private CountDownLatch failLatchCount = new CountDownLatch(2); + + private CountDownLatch offset0Latch = new CountDownLatch(1); + + protected int latchWaitFailCount = 0; @KafkaHandler public CompletableFuture listen(String message, - @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); @@ -701,13 +838,32 @@ public CompletableFuture listen(String message, if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + failLatchCount.await(10, TimeUnit.SECONDS); + if (offset.equals("1")) { + offset0Latch.await(10, TimeUnit.SECONDS); + } + } + } + catch (InterruptedException e) { + latchWaitFailCount += 1; + throw new RuntimeException(e); } finally { + if (message.equals(SHORT_FAIL_MSG) || + receivedTopic.equals(TEST_TOPIC4)) { + failLatchCount.countDown(); + } + if (offset.equals("0") && + receivedTopic.equals(TEST_TOPIC4)) { + offset0Latch.countDown(); + } container.countDownLatch4.countDown(); } return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } + } @KafkaListener( @@ -726,12 +882,19 @@ static class TestTopicListener5 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_SUCCESS_MSG = "100"; + public static final String LONG_SUCCESS_MSG = "success"; + + public static final String SHORT_FAIL_MSG = "fail"; - public static final String SHORT_FAIL_MSG = "1"; + private CountDownLatch failLatchCount = new CountDownLatch(24 + 49); + + protected int latchWaitFailCount = 0; @KafkaHandler - public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public CompletableFuture listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); @@ -740,14 +903,33 @@ public CompletableFuture listen(String message, @Header(KafkaHeaders.REC if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + failLatchCount.await(10, TimeUnit.SECONDS); + } + } + catch (InterruptedException e) { + latchWaitFailCount += 1; + throw new RuntimeException(e); } finally { + if (message.equals(SHORT_FAIL_MSG)) { + if (receivedTopic.equals(TEST_TOPIC5) && + Integer.valueOf(offset) > 25) { + failLatchCount.countDown(); + } + else { + if (failLatchCount.getCount() > 0) { + failLatchCount.countDown(); + } + } + } container.countDownLatch5.countDown(); } return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + }); } + } @KafkaListener( @@ -766,37 +948,76 @@ static class TestTopicListener6 { private final List receivedTopics = new ArrayList<>(); - public static final String SUCCESS_SUFFIX = ",s"; + public static final String SUCCESS_PREFIX = "success"; + + public static final String FAIL_PREFIX = "fail"; + + protected CountDownLatch offset1CompletedLatch = new CountDownLatch(1); + + protected CountDownLatch offset2CompletedLatch = new CountDownLatch(1); - public static final String FAIL_SUFFIX = ",f"; + protected CountDownLatch offset3RetryCompletedLatch = new CountDownLatch(3); + + protected CountDownLatch offset4ReceivedLatch = new CountDownLatch(1); + + protected int latchWaitFailCount = 0; @KafkaHandler - public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public CompletableFuture listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); - String[] split = message.split(","); - String sleepAWhile = split[0]; - String failOrSuccess = split[1]; - return CompletableFuture.supplyAsync(() -> { try { - if (failOrSuccess.equals("f")) { - throw new RuntimeException("Woooops... in topic " + receivedTopic); + if (message.startsWith(FAIL_PREFIX)) { + if (offset.equals("0")) { + if (receivedTopic.equals(TEST_TOPIC6)) { + offset1CompletedLatch.await(10, TimeUnit.SECONDS); + } + } + + if (offset.equals("3")) { + offset3RetryCompletedLatch.countDown(); + } + + throw new RuntimeException("Woooops... in topic " + receivedTopic + "msg : " + message); + } + else { + if (offset.equals("1")) { + offset3RetryCompletedLatch.await(10, TimeUnit.SECONDS); + offset1CompletedLatch.countDown(); + } + + if (offset.equals("2")) { + offset4ReceivedLatch.await(10, TimeUnit.SECONDS); + offset2CompletedLatch.countDown(); + } + + if (offset.equals("4")) { + offset4ReceivedLatch.countDown(); + offset2CompletedLatch.await(10, TimeUnit.SECONDS); + } } } + catch (InterruptedException ex) { + latchWaitFailCount += 1; + throw new RuntimeException(ex); + } finally { container.countDownLatch6.countDown(); } return "Task Completed"; - }, CompletableFuture.delayedExecutor(Integer.parseInt(sleepAWhile), TimeUnit.MILLISECONDS)); + }); } } static class CountDownLatchContainer { - static int COUNT0 = 15; + static int COUNT0 = 9; static int DLT_COUNT0 = 3; @@ -836,17 +1057,17 @@ static class CountDownLatchContainer { CountDownLatch dltCountdownLatch4 = new CountDownLatch(DLT_COUNT4); - static int COUNT5 = 501; + static int COUNT5 = 24 + 73; - static int DLT_COUNT5 = 100; + static int DLT_COUNT5 = 49; CountDownLatch countDownLatch5 = new CountDownLatch(COUNT5); CountDownLatch dltCountdownLatch5 = new CountDownLatch(DLT_COUNT5); - static int COUNT6 = 250; + static int COUNT6 = 9; - static int DLT_COUNT6 = 50; + static int DLT_COUNT6 = 2; CountDownLatch countDownLatch6 = new CountDownLatch(COUNT6); @@ -882,11 +1103,12 @@ static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { static RetryTopicConfiguration createRetryTopicConfiguration( KafkaTemplate template, String topicName, - String dltBeanName) { + String dltBeanName, + int maxAttempts) { return RetryTopicConfigurationBuilder .newInstance() .fixedBackOff(50) - .maxAttempts(5) + .maxAttempts(maxAttempts) .concurrency(1) .useSingleTopicForSameIntervals() .includeTopic(topicName) @@ -900,7 +1122,8 @@ RetryTopicConfiguration testRetryTopic0(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC0, - "myCustomDltProcessor0"); + "myCustomDltProcessor0", + 3); } @Bean @@ -908,7 +1131,8 @@ RetryTopicConfiguration testRetryTopic1(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC1, - "myCustomDltProcessor1"); + "myCustomDltProcessor1", + 5); } @Bean @@ -916,7 +1140,8 @@ RetryTopicConfiguration testRetryTopic2(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC2, - "myCustomDltProcessor2"); + "myCustomDltProcessor2", + 5); } @Bean @@ -924,7 +1149,8 @@ RetryTopicConfiguration testRetryTopic3(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC3, - "myCustomDltProcessor3"); + "myCustomDltProcessor3", + 5); } @Bean @@ -932,7 +1158,8 @@ RetryTopicConfiguration testRetryTopic4(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC4, - "myCustomDltProcessor4"); + "myCustomDltProcessor4", + 5); } @Bean @@ -940,7 +1167,8 @@ RetryTopicConfiguration testRetryTopic5(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC5, - "myCustomDltProcessor5"); + "myCustomDltProcessor5", + 3); } @Bean @@ -948,7 +1176,8 @@ RetryTopicConfiguration testRetryTopic6(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC6, - "myCustomDltProcessor6"); + "myCustomDltProcessor6", + 3); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java index ed467cf23c..a1ca877b89 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java @@ -23,8 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,6 +31,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -102,9 +101,9 @@ public class AsyncMonoRetryTopicScenarioTests { @Test void allFailCaseTest( - @Autowired TestTopicListener0 zeroTopicListener, + @Autowired TestTopicListener0 testTopicListener, @Autowired MyCustomDltProcessor myCustomDltProcessor0) { - // Given + // All Fail case. String shortFailedMsg1 = "0"; String shortFailedMsg2 = "1"; String shortFailedMsg3 = "2"; @@ -121,13 +120,7 @@ void allFailCaseTest( shortFailedMsg1, shortFailedMsg2, shortFailedMsg3, - shortFailedMsg1, - shortFailedMsg2, - shortFailedMsg3, - shortFailedMsg1, - shortFailedMsg2, - shortFailedMsg3 - }; + }; String[] expectedReceivedTopics = { TEST_TOPIC0, TEST_TOPIC0, @@ -138,13 +131,7 @@ void allFailCaseTest( expectedRetryTopic, expectedRetryTopic, expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic, - expectedRetryTopic - }; + }; String[] expectedDltMsgs = { shortFailedMsg1, shortFailedMsg2, @@ -162,8 +149,8 @@ void allFailCaseTest( assertThat(destinationTopic.getDestinationName()).isEqualTo(TEST_TOPIC0 + "-retry"); - assertThat(zeroTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); - assertThat(zeroTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); assertThat(myCustomDltProcessor0.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @@ -172,9 +159,18 @@ void allFailCaseTest( void firstShortFailAndLastLongSuccessRetryTest( @Autowired TestTopicListener1 testTopicListener1, @Autowired MyCustomDltProcessor myCustomDltProcessor1) { + // Scenario. + // 1. Short Fail msg (offset 0) + // 2. Long success msg (offset 1) -> -ing (latch wait) + // 3. Short fail msg (Retry1 offset 0) -> (latch down) + // 4. Long success msg (offset 1) -> Success! + // 5. Short fail msg (Retry2 offset 0) + // 6. Short fail msg (Retry3 offset 0) + // 7. Short fail msg (Retry4 offset 0) + // Given - String longSuccessMsg = "3"; - String shortFailedMsg = "1"; + String longSuccessMsg = testTopicListener1.LONG_SUCCESS_MSG; + String shortFailedMsg = testTopicListener1.SHORT_FAIL_MSG; DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("1-topicId", TEST_TOPIC1); String expectedRetryTopic = TEST_TOPIC1 + "-retry"; @@ -211,17 +207,28 @@ void firstShortFailAndLastLongSuccessRetryTest( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(testTopicListener1.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(testTopicListener1.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener1.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor1.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @Test void firstLongSuccessAndLastShortFailed( - @Autowired TestTopicListener2 zero2TopicListener, + @Autowired TestTopicListener2 testTopicListener2, @Autowired MyCustomDltProcessor myCustomDltProcessor2) { + // Scenario. + // 1. Long success msg (offset 0) -> going on... (latch await) + // 2. Short fail msg (offset 1) -> done. + // 3. Short fail msg (Retry1 offset 1) -> done (latch down) + // 4. Long success msg (offset 0) -> succeed. + // 5. Short fail msg (Retry2 offset 1) + // 6. Short fail msg (Retry3 offset 1) + // 7. Short fail msg (Retry4 offset 1) + // 8. Short fail msg (dlt offset 1) + // Given - String shortFailedMsg = "1"; - String longSuccessMsg = "3"; + String shortFailedMsg = testTopicListener2.SHORT_FAIL_MSG; + String longSuccessMsg = testTopicListener2.LONG_SUCCESS_MSG; DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("2-topicId", TEST_TOPIC2); String expectedRetryTopic = TEST_TOPIC2 + "-retry"; @@ -252,12 +259,14 @@ void firstLongSuccessAndLastShortFailed( kafkaTemplate.send(TEST_TOPIC2, shortFailedMsg); // Then + assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); assertThat(awaitLatch(latchContainer.dltCountdownLatch2)).isTrue(); assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); - assertThat(zero2TopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); - assertThat(zero2TopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener2.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener2.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener2.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor2.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @@ -266,24 +275,48 @@ void firstLongSuccessAndLastShortFailed( void longFailMsgTwiceThenShortSuccessMsgThird( @Autowired TestTopicListener3 testTopicListener3, @Autowired MyCustomDltProcessor myCustomDltProcessor3) { + // Scenario + // 1. Long fail msg arrived (offset 0) -> -ing (wait latch offset 4) + // 2. Long fail msg arrived (offset 1) -> -ing (wait latch offset 1) + // 3. Short success msg arrived (offset 2) -> done + // 4. Short success msg arrived (offset 3) -> done + // 5. Short success msg arrived (offset 4) -> done (latch offset 4 count down) + // 6. Long fail msg throws error (offset 0) -> done + // 7. Long fail msg throws error (offset 1) -> done + // 8. Long fail msg (retry 1 with offset 0) -> done + // 9. Long fail msg (retry 1 with offset 1) -> done + // 10. Long fail msg (retry 2 with offset 0) -> done + // 11. Long fail msg (retry 2 with offset 1) -> done + // 12. Long fail msg (retry 3 with offset 0) -> done + // 13. Long fail msg (retry 3 with offset 1) -> done + // 14. Long fail msg (retry 4 with offset 0) -> done + // 15. Long fail msg (retry 4 with offset 1) -> done + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("3-topicId", TEST_TOPIC3); + String firstMsg = TestTopicListener3.FAIL_PREFIX + "0"; + String secondMsg = TestTopicListener3.FAIL_PREFIX + "1"; + String thirdMsg = TestTopicListener3.SUCCESS_PREFIX + "2"; + String fourthMsg = TestTopicListener3.SUCCESS_PREFIX + "3"; + String fifthMsg = TestTopicListener3.SUCCESS_PREFIX + "4"; + String expectedRetryTopic = TEST_TOPIC3 + "-retry"; + String[] expectedReceivedMsgs = { - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.SHORT_SUCCESS_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, + firstMsg, + secondMsg, + thirdMsg, + fourthMsg, + fifthMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, + firstMsg, + secondMsg, }; String[] expectedReceivedTopics = { @@ -303,16 +336,16 @@ void longFailMsgTwiceThenShortSuccessMsgThird( }; String[] expectedDltMsgs = { - TestTopicListener3.LONG_FAIL_MSG, - TestTopicListener3.LONG_FAIL_MSG, + firstMsg, + secondMsg, }; // When - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); - kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, firstMsg); + kafkaTemplate.send(TEST_TOPIC3, secondMsg); + kafkaTemplate.send(TEST_TOPIC3, thirdMsg); + kafkaTemplate.send(TEST_TOPIC3, fourthMsg); + kafkaTemplate.send(TEST_TOPIC3, fifthMsg); // Then assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); @@ -321,6 +354,7 @@ void longFailMsgTwiceThenShortSuccessMsgThird( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(testTopicListener3.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(testTopicListener3.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(testTopicListener3.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor3.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @@ -329,6 +363,22 @@ void longFailMsgTwiceThenShortSuccessMsgThird( void longSuccessMsgTwiceThenShortFailMsgTwice( @Autowired TestTopicListener4 topicListener4, @Autowired MyCustomDltProcessor myCustomDltProcessor4) { + // Scenario + // 1. Msg arrived (offset 0) -> -ing + // 2. Msg arrived (offset 1) -> -ing + // 3. Msg arrived (offset 2) throws error -> done + // 4. Msg arrived (offset 3) throws error -> done + // 5. Msg arrived (offset 0) succeed -> done + // 6. Msg arrived (offset 1) succeed -> done + // 7. Msg arrived (retry 1, offset 2) -> done + // 8. Msg arrived (retry 1, offset 3) -> done + // 9. Msg arrived (retry 2, offset 2) -> done + // 10. Msg arrived (retry 2, offset 3) -> done + // 11. Msg arrived (retry 3, offset 2) -> done + // 12. Msg arrived (retry 3, offset 3) -> done + // 13. Msg arrived (retry 4, offset 2) -> done + // 14. Msg arrived (retry 4, offset 3) -> done + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("4-TopicId", TEST_TOPIC4); @@ -361,7 +411,7 @@ void longSuccessMsgTwiceThenShortFailMsgTwice( expectedRetryTopic, expectedRetryTopic, expectedRetryTopic, - }; + }; String[] expectedDltMsgs = { TestTopicListener4.SHORT_FAIL_MSG, @@ -381,44 +431,58 @@ void longSuccessMsgTwiceThenShortFailMsgTwice( assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); assertThat(topicListener4.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); assertThat(topicListener4.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(topicListener4.latchWaitFailCount).isEqualTo(0); assertThat(myCustomDltProcessor4.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } @Test - void oneLongSuccessMsgBetween100ShortFailMsgs( + void oneLongSuccessMsgBetween49ShortFailMsg( @Autowired TestTopicListener5 topicListener5, @Autowired MyCustomDltProcessor myCustomDltProcessor5) { + // Scenario. + // 1. msgs received (offsets 0 ~ 24) -> failed. + // 2. msgs received (offset 25) -> -ing + // 3. msgs received (offset 26 ~ 49) -> failed. + // 4. msgs succeed (offset 50) -> done + // 5. msgs received (Retry1 offset 0 ~ 49 except 25) -> failed. + // 6. msgs received (Retry2 offset 0 ~ 49 except 25) -> failed. + // 7. msgs received (Retry3 offset 0 ~ 49 except 25) -> failed. + // 8. msgs received (Retry4 offset 0 ~ 49 except 25) -> failed. + // Given DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("5-TopicId", TEST_TOPIC5); String expectedRetryTopic = TEST_TOPIC5 + "-retry"; - String[] expectedReceivedMsgs = new String[501]; - for (int i = 0; i < 500; i++) { + String[] expectedReceivedMsgs = new String[148]; + for (int i = 0; i < 147; i++) { expectedReceivedMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; } - expectedReceivedMsgs[500] = TestTopicListener5.LONG_SUCCESS_MSG; + expectedReceivedMsgs[147] = TestTopicListener5.LONG_SUCCESS_MSG; + - String[] expectedReceivedTopics = new String[501]; - for (int i = 0; i < 100; i++) { + String[] expectedReceivedTopics = new String[148]; + for (int i = 0; i < 49; i++) { expectedReceivedTopics[i] = TEST_TOPIC5; } - for (int i = 100; i < 500; i++) { + for (int i = 49; i < 147; i++) { expectedReceivedTopics[i] = expectedRetryTopic; } - expectedReceivedTopics[500] = TEST_TOPIC5; + expectedReceivedTopics[147] = TEST_TOPIC5; - String[] expectedDltMsgs = new String[100]; - for (int i = 0; i < 100; i++) { + String[] expectedDltMsgs = new String[49]; + for (int i = 0; i < 49; i++) { expectedDltMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; } // When - for (int i = 0; i < 100; i++) { - kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); - if (i == 50) { + for (int i = 0; i < 50; i++) { + if (i != 25) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); + } + else { kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.LONG_SUCCESS_MSG); } } @@ -435,75 +499,81 @@ void oneLongSuccessMsgBetween100ShortFailMsgs( } @Test - void halfSuccessMsgAndHalfFailedMsgWithRandomSleepTime( + void moreComplexAsyncScenarioTest( @Autowired TestTopicListener6 topicListener6, - @Autowired MyCustomDltProcessor myCustomDltProcessor6) { + @Autowired @Qualifier("myCustomDltProcessor6") + MyCustomDltProcessor myCustomDltProcessor6) { + // Scenario. + // 1. Fail Msg (offset 0) -> -ing + // 2. Success Msg (offset 1) -> -ing + // 3. Success Msg (offset 2) -> -ing + // 4. Fail Msg (offset 3) -> done + // 5. Success Msg (offset 4) -> -ing + // 6. Success msg succeed (offset 2) - done + // 7. Success msg succeed (offset 4) -> done + // 8. Fail Msg (Retry1 offset 3) -> done + // 9. Fail Msg (Retry2 offset 3) -> done + // 10. Success msg succeed (offset 1) -> done + // 11. Fail Msg (offset 0) -> done + // 12. Fail Msg (Retry 1 offset 0) -> done + // 13. Fail Msg (Retry 2 offset 0) -> done + // Given - DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); + String firstMsg = TestTopicListener6.FAIL_PREFIX + "0"; + String secondMsg = TestTopicListener6.SUCCESS_PREFIX + "1"; + String thirdMsg = TestTopicListener6.SUCCESS_PREFIX + "2"; + String fourthMsg = TestTopicListener6.FAIL_PREFIX + "3"; + String fifthMsg = TestTopicListener6.SUCCESS_PREFIX + "4"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); String expectedRetryTopic = TEST_TOPIC6 + "-retry"; - Random random = new Random(); - ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(); - - for (int i = 0; i < 50; i++) { - int randomSleepAWhile = random.nextInt(1, 100); - String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.SUCCESS_SUFFIX; - q.add(msg); - } - - for (int i = 0; i < 50; i++) { - int randomSleepAWhile = random.nextInt(1, 100); - String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.FAIL_SUFFIX; - q.add(msg); - } - - int expectedSuccessMsgCount = 50; - int expectedFailedMsgCount = 250; + String[] expectedReceivedMsgs = { + firstMsg, + secondMsg, + thirdMsg, + fourthMsg, + fifthMsg, + fourthMsg, + fourthMsg, + firstMsg, + firstMsg + }; - int expectedReceivedOriginalTopicCount = 100; - int expectedReceivedRetryTopicCount = 200; - int expectedReceivedDltMsgCount = 50; + String[] expectedReceivedTopics = { + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + TEST_TOPIC6, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + String[] expectedDltMsgs = { + TestTopicListener6.FAIL_PREFIX + "3", + TestTopicListener6.FAIL_PREFIX + "0" + }; // When - while (!q.isEmpty()) { - String successOrFailMsg = q.poll(); - kafkaTemplate.send(TEST_TOPIC6, successOrFailMsg); - } + kafkaTemplate.send(TEST_TOPIC6, firstMsg); + kafkaTemplate.send(TEST_TOPIC6, secondMsg); + kafkaTemplate.send(TEST_TOPIC6, thirdMsg); + kafkaTemplate.send(TEST_TOPIC6, fourthMsg); + kafkaTemplate.send(TEST_TOPIC6, fifthMsg); // Then assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); assertThat(awaitLatch(latchContainer.dltCountdownLatch6)).isTrue(); assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener6.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener6.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + assertThat(topicListener6.latchWaitFailCount).isEqualTo(0); - long actualReceivedSuccessMsgCount = topicListener6.receivedMsgs.stream() - .map(s -> s.split(",")[1]) - .filter(m -> (',' + m).equals(TestTopicListener6.SUCCESS_SUFFIX)) - .count(); - - long actualReceivedFailedMsgCount = topicListener6.receivedMsgs.stream() - .map(s -> s.split(",")[1]) - .filter(m -> (',' + m).equals( - TestTopicListener6.FAIL_SUFFIX)) - .count(); - - - long actualReceivedOriginalTopicMsgCount = topicListener6.receivedTopics.stream() - .filter(topic -> topic.equals(TEST_TOPIC6)) - .count(); - - long actualReceivedRetryTopicMsgCount = topicListener6.receivedTopics.stream() - .filter(topic -> topic.equals(expectedRetryTopic)) - .count(); - - assertThat(actualReceivedSuccessMsgCount).isEqualTo(expectedSuccessMsgCount); - assertThat(actualReceivedFailedMsgCount).isEqualTo(expectedFailedMsgCount); - assertThat(actualReceivedOriginalTopicMsgCount).isEqualTo(expectedReceivedOriginalTopicCount); - assertThat(actualReceivedRetryTopicMsgCount).isEqualTo(expectedReceivedRetryTopicCount); - - assertThat(myCustomDltProcessor6.receivedMsg.size()).isEqualTo(expectedReceivedDltMsgCount); + assertThat(myCustomDltProcessor6.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); } private boolean awaitLatch(CountDownLatch latch) { @@ -514,6 +584,7 @@ private boolean awaitLatch(CountDownLatch latch) { fail(e.getMessage()); throw new RuntimeException(e); } + } @KafkaListener( @@ -540,13 +611,10 @@ public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) St try { throw new RuntimeException("Woooops... in topic " + receivedTopic); } - catch (Exception e) { - throw e; - } finally { container.countDownLatch0.countDown(); } - }).then(); + }); } } @@ -567,32 +635,45 @@ static class TestTopicListener1 { private final List receivedTopics = new ArrayList<>(); + private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1); + + protected final String LONG_SUCCESS_MSG = "success"; + + protected final String SHORT_FAIL_MSG = "fail"; + + protected int latchWaitFailCount = 0; + @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { - this.receivedMsgs.add(message); + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedTopics.add(receivedTopic); - + this.receivedMsgs.add(message); return Mono.fromCallable(() -> { try { - Thread.sleep(Integer.parseInt(message)); - if (message.equals("1")) { + if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + firstRetryFailMsgLatch.await(10, TimeUnit.SECONDS); + } } catch (InterruptedException e) { + latchWaitFailCount += 1; throw new RuntimeException(e); } - catch (RuntimeException e) { - throw e; - } finally { + if (receivedTopic.equals(TEST_TOPIC1 + "-retry") && + offset.equals("0")) { + firstRetryFailMsgLatch.countDown(); + } container.countDownLatch1.countDown(); } - return "Task Completed"; }); - } + } @KafkaListener( @@ -611,31 +692,47 @@ static class TestTopicListener2 { private final List receivedTopics = new ArrayList<>(); + private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1); + + protected final String LONG_SUCCESS_MSG = "success"; + + protected final String SHORT_FAIL_MSG = "fail"; + + protected int latchWaitFailCount = 0; + @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return Mono.fromCallable(() -> { try { - Thread.sleep(Integer.parseInt(message)); - if (message.equals("1")) { + if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + firstRetryFailMsgLatch.await(10, TimeUnit.SECONDS); + } } catch (InterruptedException e) { + latchWaitFailCount += 1; throw new RuntimeException(e); } - catch (RuntimeException e) { - throw e; - } finally { + if (receivedTopic.equals(TEST_TOPIC2 + "-retry") && + offset.equals("1")) { + firstRetryFailMsgLatch.countDown(); + } container.countDownLatch2.countDown(); } return "Task Completed"; }); } + } @KafkaListener( @@ -654,37 +751,53 @@ static class TestTopicListener3 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_FAIL_MSG = "100"; + public static final String FAIL_PREFIX = "fail"; + + public static final String SUCCESS_PREFIX = "success"; - public static final String SHORT_SUCCESS_MSG = "1"; + private CountDownLatch successLatchCount = new CountDownLatch(3); + + private CountDownLatch offset0Latch = new CountDownLatch(1); + + protected int latchWaitFailCount = 0; @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC)String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return Mono.fromCallable(() -> { try { - Thread.sleep(Integer.parseInt(message)); - if (message.equals(LONG_FAIL_MSG)) { + if (message.startsWith(FAIL_PREFIX)) { + if (receivedTopic.equals(TEST_TOPIC3)) { + if (offset.equals("0")) { + successLatchCount.await(10, TimeUnit.SECONDS); + offset0Latch.countDown(); + } + if (offset.equals("1")) { + offset0Latch.await(10, TimeUnit.SECONDS); + } + } throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + successLatchCount.countDown(); + } } catch (InterruptedException e) { + latchWaitFailCount += 1; throw new RuntimeException(e); } - catch (RuntimeException e) { - throw e; - } finally { container.countDownLatch3.countDown(); } - return "Task Completed"; }); - - } + } @KafkaListener( @@ -703,36 +816,54 @@ static class TestTopicListener4 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_SUCCESS_MSG = "100"; + public static final String LONG_SUCCESS_MSG = "success"; + + public static final String SHORT_FAIL_MSG = "fail"; + + private CountDownLatch failLatchCount = new CountDownLatch(2); + + private CountDownLatch offset0Latch = new CountDownLatch(1); - public static final String SHORT_FAIL_MSG = "1"; + protected int latchWaitFailCount = 0; @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public Mono listen(String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return Mono.fromCallable(() -> { try { - Thread.sleep(Integer.parseInt(message)); if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + failLatchCount.await(10, TimeUnit.SECONDS); + if (offset.equals("1")) { + offset0Latch.await(10, TimeUnit.SECONDS); + } + } } catch (InterruptedException e) { + latchWaitFailCount += 1; throw new RuntimeException(e); } - catch (RuntimeException e) { - throw e; - } finally { + if (message.equals(SHORT_FAIL_MSG) || + receivedTopic.equals(TEST_TOPIC4)) { + failLatchCount.countDown(); + } + if (offset.equals("0") && + receivedTopic.equals(TEST_TOPIC4)) { + offset0Latch.countDown(); + } container.countDownLatch4.countDown(); } - return "Task Completed"; }); - } + } @KafkaListener( @@ -751,35 +882,54 @@ static class TestTopicListener5 { private final List receivedTopics = new ArrayList<>(); - public static final String LONG_SUCCESS_MSG = "100"; + public static final String LONG_SUCCESS_MSG = "success"; + + public static final String SHORT_FAIL_MSG = "fail"; - public static final String SHORT_FAIL_MSG = "1"; + private CountDownLatch failLatchCount = new CountDownLatch(24 + 49); + + protected int latchWaitFailCount = 0; @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return Mono.fromCallable(() -> { try { - Thread.sleep(Integer.parseInt(message)); if (message.equals(SHORT_FAIL_MSG)) { throw new RuntimeException("Woooops... in topic " + receivedTopic); } + else { + failLatchCount.await(10, TimeUnit.SECONDS); + } } catch (InterruptedException e) { + latchWaitFailCount += 1; throw new RuntimeException(e); } - catch (RuntimeException e) { - throw e; - } finally { + if (message.equals(SHORT_FAIL_MSG)) { + if (receivedTopic.equals(TEST_TOPIC5) && + Integer.valueOf(offset) > 25) { + failLatchCount.countDown(); + } + else { + if (failLatchCount.getCount() > 0) { + failLatchCount.countDown(); + } + } + } container.countDownLatch5.countDown(); } return "Task Completed"; }); } + } @KafkaListener( @@ -798,31 +948,63 @@ static class TestTopicListener6 { private final List receivedTopics = new ArrayList<>(); - public static final String SUCCESS_SUFFIX = ",s"; + public static final String SUCCESS_PREFIX = "success"; + + public static final String FAIL_PREFIX = "fail"; + + protected CountDownLatch offset1CompletedLatch = new CountDownLatch(1); + + protected CountDownLatch offset2CompletedLatch = new CountDownLatch(1); + + protected CountDownLatch offset3RetryCompletedLatch = new CountDownLatch(3); + + protected CountDownLatch offset4ReceivedLatch = new CountDownLatch(1); - public static final String FAIL_SUFFIX = ",f"; + protected int latchWaitFailCount = 0; @KafkaHandler - public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Header(KafkaHeaders.OFFSET) String offset) { this.receivedMsgs.add(message); this.receivedTopics.add(receivedTopic); return Mono.fromCallable(() -> { - String[] split = message.split(","); - String sleepAWhile = split[0]; - String failOrSuccess = split[1]; - try { - Thread.sleep(Integer.parseInt(sleepAWhile)); - if (failOrSuccess.equals("f")) { - throw new RuntimeException("Woooops... in topic " + receivedTopic); + if (message.startsWith(FAIL_PREFIX)) { + if (offset.equals("0")) { + if (receivedTopic.equals(TEST_TOPIC6)) { + offset1CompletedLatch.await(10, TimeUnit.SECONDS); + } + } + + if (offset.equals("3")) { + offset3RetryCompletedLatch.countDown(); + } + + throw new RuntimeException("Woooops... in topic " + receivedTopic + "msg : " + message); + } + else { + if (offset.equals("1")) { + offset3RetryCompletedLatch.await(10, TimeUnit.SECONDS); + offset1CompletedLatch.countDown(); + } + + if (offset.equals("2")) { + offset4ReceivedLatch.await(10, TimeUnit.SECONDS); + offset2CompletedLatch.countDown(); + } + + if (offset.equals("4")) { + offset4ReceivedLatch.countDown(); + offset2CompletedLatch.await(10, TimeUnit.SECONDS); + } } } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (RuntimeException e) { - throw e; + catch (InterruptedException ex) { + latchWaitFailCount += 1; + throw new RuntimeException(ex); } finally { container.countDownLatch6.countDown(); @@ -835,7 +1017,7 @@ public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) static class CountDownLatchContainer { - static int COUNT0 = 15; + static int COUNT0 = 9; static int DLT_COUNT0 = 3; @@ -875,17 +1057,17 @@ static class CountDownLatchContainer { CountDownLatch dltCountdownLatch4 = new CountDownLatch(DLT_COUNT4); - static int COUNT5 = 501; + static int COUNT5 = 24 + 73; - static int DLT_COUNT5 = 100; + static int DLT_COUNT5 = 49; CountDownLatch countDownLatch5 = new CountDownLatch(COUNT5); CountDownLatch dltCountdownLatch5 = new CountDownLatch(DLT_COUNT5); - static int COUNT6 = 250; + static int COUNT6 = 9; - static int DLT_COUNT6 = 50; + static int DLT_COUNT6 = 2; CountDownLatch countDownLatch6 = new CountDownLatch(COUNT6); @@ -897,8 +1079,7 @@ static class MyCustomDltProcessor { final List receivedMsg = new ArrayList<>(); - MyCustomDltProcessor(KafkaTemplate kafkaTemplate, - CountDownLatch latch) { + MyCustomDltProcessor(KafkaTemplate kafkaTemplate, CountDownLatch latch) { this.kafkaTemplate = kafkaTemplate; this.latch = latch; } @@ -918,13 +1099,15 @@ static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { private static final String DLT_METHOD_NAME = "processDltMessage"; - static RetryTopicConfiguration createRetryTopicConfiguration(KafkaTemplate template, - String topicName, - String dltBeanName) { + static RetryTopicConfiguration createRetryTopicConfiguration( + KafkaTemplate template, + String topicName, + String dltBeanName, + int maxAttempts) { return RetryTopicConfigurationBuilder .newInstance() .fixedBackOff(50) - .maxAttempts(5) + .maxAttempts(maxAttempts) .concurrency(1) .useSingleTopicForSameIntervals() .includeTopic(topicName) @@ -938,7 +1121,8 @@ RetryTopicConfiguration testRetryTopic0(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC0, - "myCustomDltProcessor0"); + "myCustomDltProcessor0", + 3); } @Bean @@ -946,7 +1130,8 @@ RetryTopicConfiguration testRetryTopic1(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC1, - "myCustomDltProcessor1"); + "myCustomDltProcessor1", + 5); } @Bean @@ -954,7 +1139,8 @@ RetryTopicConfiguration testRetryTopic2(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC2, - "myCustomDltProcessor2"); + "myCustomDltProcessor2", + 5); } @Bean @@ -962,7 +1148,8 @@ RetryTopicConfiguration testRetryTopic3(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC3, - "myCustomDltProcessor3"); + "myCustomDltProcessor3", + 5); } @Bean @@ -970,7 +1157,8 @@ RetryTopicConfiguration testRetryTopic4(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC4, - "myCustomDltProcessor4"); + "myCustomDltProcessor4", + 5); } @Bean @@ -978,7 +1166,8 @@ RetryTopicConfiguration testRetryTopic5(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC5, - "myCustomDltProcessor5"); + "myCustomDltProcessor5", + 3); } @Bean @@ -986,7 +1175,8 @@ RetryTopicConfiguration testRetryTopic6(KafkaTemplate template) return createRetryTopicConfiguration( template, TEST_TOPIC6, - "myCustomDltProcessor6"); + "myCustomDltProcessor6", + 3); } @Bean @@ -1125,6 +1315,7 @@ ProducerFactory producerFactory() { KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } + } @EnableKafka