From 276275b3cac83198093f40aa2dcd563e2fc751cc Mon Sep 17 00:00:00 2001 From: Wang Zhiyang <1208931582@qq.com> Date: Sat, 2 Mar 2024 05:06:26 +0800 Subject: [PATCH] Fix `testInvokeRecordInterceptorAllSkipped()` * When `AckMode.RECORD` and early is `false,` and the last execute method is `RecordInterceptor.afterRecord`, we missed a `CountDownLatch` in KMLCT#testInvokeRecordInterceptorAllSkipped that must wait for `RecordInterceptor.afterRecord` to complete. (cherry-picked from commit 5beb8fe607bd131ced632dbc7afa32e11a6ff2dd) --- .../KafkaMessageListenerContainerTests.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index e3baef6428..2abf892d41 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -2768,7 +2768,6 @@ public void rePausePartitionAfterRebalance() throws Exception { rebal.get().onPartitionsAssigned(Set.of(tp0, tp1)); return null; }).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class)); - final CountDownLatch resumeLatch = new CountDownLatch(1); ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setGroupId("grp"); containerProps.setAckMode(AckMode.RECORD); @@ -2779,7 +2778,6 @@ public void rePausePartitionAfterRebalance() throws Exception { KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); - InOrder inOrder = inOrder(consumer); assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull(); container.pausePartition(tp0); container.pausePartition(tp1); @@ -2810,7 +2808,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); - AtomicBoolean first = new AtomicBoolean(true); TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); given(consumer.assignment()).willReturn(Set.of(tp0, tp1)); @@ -3466,6 +3463,7 @@ public void testCooperativeRebalance() throws Exception { containerProps.setClientId("clientId"); containerProps.setMessageListener((MessageListener) msg -> { }); Properties consumerProps = new Properties(); + containerProps.setMessageListener((MessageListener) msg -> { }); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); @@ -3609,7 +3607,6 @@ else if (call == 1) { }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); List> commits = new ArrayList<>(); AtomicBoolean firstCommit = new AtomicBoolean(true); - AtomicInteger commitCount = new AtomicInteger(); willAnswer(invoc -> { commits.add(invoc.getArgument(0, Map.class)); if (!firstCommit.getAndSet(false)) { @@ -3891,6 +3888,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early latch.countDown(); return null; }).given(consumer).commitSync(any(), any()); + CountDownLatch closeLatch = new CountDownLatch(1); + willAnswer(inv -> { + closeLatch.countDown(); + return null; + }).given(consumer).close(); TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] { new TopicPartitionOffset("foo", 0) }; @@ -3905,6 +3907,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setTransactionManager(mock(PlatformTransactionManager.class)); } + CountDownLatch afterRecordLatch = new CountDownLatch(2); RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override @@ -3915,6 +3918,10 @@ public ConsumerRecord intercept(ConsumerRecord return null; } + public void afterRecord(ConsumerRecord record, Consumer consumer) { + afterRecordLatch.countDown(); + } + }); KafkaMessageListenerContainer container = @@ -3923,6 +3930,9 @@ public ConsumerRecord intercept(ConsumerRecord container.setInterceptBeforeTx(early); container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(afterRecordLatch.await(10, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); InOrder inOrder = inOrder(recordInterceptor, consumer); inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); @@ -3949,7 +3959,7 @@ public ConsumerRecord intercept(ConsumerRecord inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), any(Duration.class)); } - container.stop(); + inOrder.verify(consumer).close(); } @ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")