From 46372df9bbb73c447ab9eedd6ff59d93efc03170 Mon Sep 17 00:00:00 2001 From: mikcar Date: Fri, 25 Nov 2022 17:25:59 +0100 Subject: [PATCH] GH-2489: Retry Commits If Necessary on Revoke Closes #2489 * Retry commits that have failed temporarily due to rebalance in progress when onPartitionsRevoked is called. * Adjust expectations for unit tests where commits are retried (previous expectation accepted the defect that failed commits for subsequently revoked partitions were not retried) --- .../kafka/listener/KafkaMessageListenerContainer.java | 1 + .../kafka/listener/KafkaMessageListenerContainerTests.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8ba77183d4..0d6a484023 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3490,6 +3490,7 @@ public void onPartitionsRevoked(Collection partitions) { } try { // Wait until now to commit, in case the user listener added acks + checkRebalanceCommits(); commitPendingAcks(); fixTxOffsetsIfNeeded(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 4ede8274c0..249aae6697 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -3366,7 +3366,7 @@ void testCommitRebalanceInProgressBatch() throws Exception { assertThat(commits).hasSize(3); assertThat(commits.get(0)).hasSize(2); // assignment assertThat(commits.get(1)).hasSize(2); // batch commit - assertThat(commits.get(2)).hasSize(1); // re-commit + assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked }); } @@ -3379,7 +3379,7 @@ void testCommitRebalanceInProgressRecord() throws Exception { assertThat(commits.get(2)).hasSize(1); assertThat(commits.get(3)).hasSize(1); assertThat(commits.get(4)).hasSize(1); - assertThat(commits.get(5)).hasSize(1); // re-commit + assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked assertThat(commits.get(5).get(new TopicPartition("foo", 1))) .isNotNull() .extracting(om -> om.offset())