From c155336de02d550e1fe8c2f70ab75ca128257ca9 Mon Sep 17 00:00:00 2001 From: ChickenchickenLove Date: Tue, 19 Nov 2024 03:33:05 +0900 Subject: [PATCH] GH-3638: Fixes bug caused by race condition during handleAsyncFailure() Fixes: #3638 Issue: https://github.com/spring-projects/spring-kafka/issues/3638 --- .../kafka/listener/KafkaMessageListenerContainer.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 51eb755577..2d38945c22 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1466,7 +1466,14 @@ protected void pollAndInvoke() { protected void handleAsyncFailure() { List> copyFailedRecords = new ArrayList<>(this.failedRecords); - this.failedRecords.clear(); + + // If we use failedRecords.clear() to remove copied record from failed records, + // We may encounter race condition during this operation. + // Other, the thread which execute this block, may miss one failed record. + int capturedRecordsCount = copyFailedRecords.size(); + for (int i = 0; i < capturedRecordsCount; i++) { + this.failedRecords.pollFirst(); + } // If any copied and failed record fails to complete due to an unexpected error, // We will give up on retrying with the remaining copied and failed Records.