Skip to content

Commit

Permalink
Fix testInvokeRecordInterceptorAllSkipped()
Browse files Browse the repository at this point in the history
* When `AckMode.RECORD` and early is `false,` and the last execute method is `RecordInterceptor.afterRecord`,
we missed a `CountDownLatch` in KMLCT#testInvokeRecordInterceptorAllSkipped that must wait for `RecordInterceptor.afterRecord` to complete.

(cherry-picked from commit 5beb8fe)
  • Loading branch information
Wzy19930507 authored and sobychacko committed Mar 1, 2024
1 parent fd150ad commit 276275b
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2768,7 +2768,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
return null;
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
final CountDownLatch resumeLatch = new CountDownLatch(1);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
Expand All @@ -2779,7 +2778,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
InOrder inOrder = inOrder(consumer);
assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull();
container.pausePartition(tp0);
container.pausePartition(tp1);
Expand Down Expand Up @@ -2810,7 +2808,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
AtomicBoolean first = new AtomicBoolean(true);
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
Expand Down Expand Up @@ -3466,6 +3463,7 @@ public void testCooperativeRebalance() throws Exception {
containerProps.setClientId("clientId");
containerProps.setMessageListener((MessageListener) msg -> { });
Properties consumerProps = new Properties();
containerProps.setMessageListener((MessageListener<?, ?>) msg -> { });
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
Expand Down Expand Up @@ -3609,7 +3607,6 @@ else if (call == 1) {
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
AtomicBoolean firstCommit = new AtomicBoolean(true);
AtomicInteger commitCount = new AtomicInteger();
willAnswer(invoc -> {
commits.add(invoc.getArgument(0, Map.class));
if (!firstCommit.getAndSet(false)) {
Expand Down Expand Up @@ -3891,6 +3888,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
latch.countDown();
return null;
}).given(consumer).commitSync(any(), any());
CountDownLatch closeLatch = new CountDownLatch(1);
willAnswer(inv -> {
closeLatch.countDown();
return null;
}).given(consumer).close();
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
new TopicPartitionOffset("foo", 0) };

Expand All @@ -3905,6 +3907,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
}

CountDownLatch afterRecordLatch = new CountDownLatch(2);
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

@Override
Expand All @@ -3915,6 +3918,10 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
return null;
}

public void afterRecord(ConsumerRecord<Integer, String> record, Consumer<Integer, String> consumer) {
afterRecordLatch.countDown();
}

});

KafkaMessageListenerContainer<Integer, String> container =
Expand All @@ -3923,6 +3930,9 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
container.setInterceptBeforeTx(early);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(afterRecordLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();

InOrder inOrder = inOrder(recordInterceptor, consumer);
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
Expand All @@ -3949,7 +3959,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
container.stop();
inOrder.verify(consumer).close();
}

@ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")
Expand Down

0 comments on commit 276275b

Please sign in to comment.