Skip to content

Commit

Permalink
Fix Race in Test
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Oct 25, 2023
1 parent c84d0f7 commit fbbe8f5
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2737,13 +2737,15 @@ public void rePausePartitionAfterRebalance() throws Exception {
return null;
}).given(consumer).pause(any());
given(consumer.paused()).willReturn(pausedParts);
CountDownLatch firstPoll = new CountDownLatch(1);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
if (paused.get()) {
pauseLatch1.countDown();
// hold up the consumer thread while we revoke/assign partitions on the test thread
suspendConsumerThread.await(10, TimeUnit.SECONDS);
}
Thread.sleep(50);
firstPoll.countDown();
return ConsumerRecords.empty();
});
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
Expand All @@ -2766,6 +2768,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
InOrder inOrder = inOrder(consumer);
assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull();
container.pausePartition(tp0);
container.pausePartition(tp1);
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down

0 comments on commit fbbe8f5

Please sign in to comment.