diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java index 1095fb59cc..434da0531c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java @@ -53,10 +53,7 @@ public class FilteringBatchMessageListenerAdapter */ public FilteringBatchMessageListenerAdapter(BatchMessageListener delegate, RecordFilterStrategy recordFilterStrategy) { - - super(delegate, recordFilterStrategy); - this.ackDiscarded = false; - this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE); + this(delegate, recordFilterStrategy, false); } /** @@ -81,10 +78,11 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener delegate, public void onMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer) { - List> consumerRecords = getRecordFilterStrategy().filterBatch(records); + final RecordFilterStrategy recordFilterStrategy = getRecordFilterStrategy(); + final List> consumerRecords = recordFilterStrategy.filterBatch(records); Assert.state(consumerRecords != null, "filter returned null from filterBatch"); - if (consumerRecords.isEmpty()) { + if (recordFilterStrategy.ignoreEmptyBatch()) { if (acknowledgment != null) { invokeDelegate(consumerRecords, acknowledgment, consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java index f2de862e64..5f13a693ec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java @@ -58,4 +58,8 @@ default List> filterBatch(List> record return records; } + default boolean ignoreEmptyBatch() { + return false; + } + }