Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3186: Filter out successful commits from retry #3189

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
* @author Wang Zhiyang
* @author Raphael Rösch
* @author Christian Mergenthaler
* @author Mikael Carlstedt
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -3226,6 +3227,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
if (!this.commitsDuringRebalance.isEmpty()) {
// Remove failed commits during last rebalance that are superseded by these commits
this.commitsDuringRebalance.keySet().removeAll(commits.keySet());
}
}
catch (RetriableCommitFailedException e) {
if (retries >= this.containerProperties.getCommitRetries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
* @author Daniel Gentes
* @author Soby Chacko
* @author Wang Zhiyang
* @author Mikael Carlstedt
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -3472,25 +3473,33 @@ public void testCooperativeRebalance() throws Exception {

@Test
void testCommitRebalanceInProgressBatch() throws Exception {
testCommitRebalanceInProgressGuts(AckMode.BATCH, 2, commits -> {
assertThat(commits).hasSize(3);
testCommitRebalanceInProgressGuts(AckMode.BATCH, 3, commits -> {
assertThat(commits).hasSize(5);
assertThat(commits.get(0)).hasSize(2); // assignment
assertThat(commits.get(1)).hasSize(2); // batch commit
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
assertThat(commits.get(1)).hasSize(2); // batch commit which should fail due to rebalance in progress
assertThat(commits.get(2)).hasSize(2); // commit retry which should fail due to rebalance in progress
assertThat(commits.get(3)).hasSize(1); // GH-3186: additional batch commit with only one partition which should be successful
assertThat(commits.get(4)).hasSize(1); // GH-2489: offsets for both uncommitted partition should be re-committed before partition 0 is revoked
assertThat(commits.get(4).get(new TopicPartition("foo", 0)))
.isNotNull()
.extracting(OffsetAndMetadata::offset)
.isEqualTo(2L);
});
}

@Test
void testCommitRebalanceInProgressRecord() throws Exception {
testCommitRebalanceInProgressGuts(AckMode.RECORD, 5, commits -> {
assertThat(commits).hasSize(6);
testCommitRebalanceInProgressGuts(AckMode.RECORD, 6, commits -> {
assertThat(commits).hasSize(8);
assertThat(commits.get(0)).hasSize(2); // assignment
assertThat(commits.get(1)).hasSize(1); // 4 individual commits
assertThat(commits.get(1)).hasSize(1); // 4 individual commits which should fail due to rebalance in progress
assertThat(commits.get(2)).hasSize(1);
assertThat(commits.get(3)).hasSize(1);
assertThat(commits.get(4)).hasSize(1);
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
assertThat(commits.get(5)).hasSize(2); // commit retry which should fail due to rebalance in progress
assertThat(commits.get(6)).hasSize(1); // GH-3186: additional commit which should be successful
assertThat(commits.get(7)).hasSize(1); // GH-2489: offsets for both partition should be re-committed before partition 0 is revoked
assertThat(commits.get(7).get(new TopicPartition("foo", 0)))
.isNotNull()
.extracting(OffsetAndMetadata::offset)
.isEqualTo(2L);
Expand All @@ -3514,25 +3523,37 @@ private void testCommitRebalanceInProgressGuts(AckMode ackMode, int exceptions,
records.put(new TopicPartition("foo", 1), Arrays.asList(
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> additionalRecords = Collections.singletonMap(
new TopicPartition("foo", 1),
Collections.singletonList(new ConsumerRecord<>("foo", 1, 2L, 1, "foo")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> additionalConsumerRecords = new ConsumerRecords<>(additionalRecords);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
AtomicInteger rebalance = new AtomicInteger();
AtomicInteger pollIteration = new AtomicInteger();
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(2);
CountDownLatch latch = new CountDownLatch(3);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
int call = rebalance.getAndIncrement();
int call = pollIteration.getAndIncrement();
final ConsumerRecords<Integer, String> result;
if (call == 0) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
result = consumerRecords;
}
else if (call == 1) {
result = additionalConsumerRecords;
}
else if (call == 2) {
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
rebal.get().onPartitionsAssigned(Collections.emptyList());
result = emptyRecords;
}
else {
result = emptyRecords;
}
latch.countDown();
return first.getAndSet(false) ? consumerRecords : emptyRecords;
return result;
});
willAnswer(invoc -> {
rebal.set(invoc.getArgument(1));
Expand Down
Loading