diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 51eb755577..2d38945c22 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1466,7 +1466,14 @@ protected void pollAndInvoke() { protected void handleAsyncFailure() { List> copyFailedRecords = new ArrayList<>(this.failedRecords); - this.failedRecords.clear(); + + // If we use failedRecords.clear() to remove copied record from failed records, + // We may encounter race condition during this operation. + // Other, the thread which execute this block, may miss one failed record. + int capturedRecordsCount = copyFailedRecords.size(); + for (int i = 0; i < capturedRecordsCount; i++) { + this.failedRecords.pollFirst(); + } // If any copied and failed record fails to complete due to an unexpected error, // We will give up on retrying with the remaining copied and failed Records.