diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f2c9545898..1876283145 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -703,14 +703,12 @@ private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaList Object filter = resolveExpression(kafkaListener.filter()); if (filter instanceof RecordFilterStrategy rfs) { endpoint.setRecordFilterStrategy(rfs); - endpoint.setAckDiscarded(true); } else { String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter"); if (StringUtils.hasText(filterBeanName)) { endpoint.setRecordFilterStrategy( this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class)); - endpoint.setAckDiscarded(true); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 608ffd84d7..2ac668e538 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -326,6 +326,7 @@ public void setReplyTemplate(KafkaTemplate replyTemplate) { @SuppressWarnings("unchecked") public void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { this.recordFilterStrategy = (RecordFilterStrategy) recordFilterStrategy; + setAckDiscarded(true); } protected boolean isAckDiscarded() {