Skip to content

Commit

Permalink
apply review
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 24, 2024
1 parent b9c1583 commit 5abad09
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ public class FilteringBatchMessageListenerAdapter<K, V>
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -81,10 +78,11 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");

if (consumerRecords.isEmpty()) {
if (recordFilterStrategy.ignoreEmptyBatch()) {
if (acknowledgment != null) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
return records;
}

default boolean ignoreEmptyBatch() {
return false;
}

}

0 comments on commit 5abad09

Please sign in to comment.