Skip to content

Commit

Permalink
apply review
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 26, 2024
1 parent 63d065e commit 1db0dd6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ public void listen(Thing thing) {
}
----

### Ignore empty batch when you use `batch` mode.
Starting with version `3.3.0`, we support ignoring empty batch that result from filtering by `RecordFilterStrategy`.
When implementing `RecordFilterStrategy`, you can configure it through `ignoreEmptyBatch()`. default is `false`, that means `KafkaListener` will be invoked even if all `ConsumerRecord` are filtered out.
[[x32-ignore-empty-batch-messages]]
=== Ignore empty batch when you use `batch` mode.

If `true` is returned, the `KafkaListener` will not be invoked when all `ConsumerRecord` s are filtered out. However, commit to broker, will still be executed.
If `false` is returned, the `KafkaListener` will be invoked when all `ConsumerRecord` s are filtered out.
Starting with version 3.3, Ignoring empty batches that result from filtering by `RecordFilterStrategy` is supported. +
When implementing `RecordFilterStrategy`, it can be configured through `ignoreEmptyBatch()`. +
The default setting is `false`, indicating `KafkaListener` will be invoked even if all `ConsumerRecord` s are filtered out. +

Let's look some examples.
If `true` is returned, the `KafkaListener` [underline]#will not be invoked# when all `ConsumerRecord` are filtered out. +
However, commit to broker, will still be executed. +
If `false` is returned, the `KafkaListener` [underline]#will be invoked# when all `ConsumerRecord` are filtered out. +

Here are some examples.

[source,java]
----
Expand All @@ -54,12 +58,12 @@ public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrateg
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(Thing thing) {
public void listen(List<Thing> things) {
...
}
----
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`.
thus `KafkaListener#listen(...)` never will be invoked at all.
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`. +
Thus `KafkaListener#listen(...)` never will be invoked at all. +


[source,java]
Expand All @@ -80,9 +84,9 @@ public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStra
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(Thing thing) {
public void listen(List<Thing> things) {
...
}
----
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`. +
thus `KafkaListener#listen(...)` always will be invoked.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) ||
this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
}

/**
* Returns a boolean value that determines whether {@link FilteringBatchMessageListenerAdapter} invoke the {@link BatchMessageListener} when all {@link ConsumerRecord}
* have been filtered and return an EmptyList. The default is not to invoke the {@link BatchMessageListener} (false).
* @return If true is returned, the {@link FilteringBatchMessageListenerAdapter} will not invoke the {@link BatchMessageListener}
* Determine whether {@link FilteringBatchMessageListenerAdapter} should invoke
* the {@link BatchMessageListener} when all {@link ConsumerRecord}s in a batch have been filtered out
* resulting in empty list. By default, do invoke the {@link BatchMessageListener} (return false).
* @return true for {@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}
* when all {@link ConsumerRecord} in a batch filtered out
* @since 3.3
*/
default boolean ignoreEmptyBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ public boolean filter(ConsumerRecord<String, String> consumerRecord) {
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return true;
}
};
Expand Down Expand Up @@ -127,13 +125,11 @@ public boolean filter(ConsumerRecord<String, String> consumerRecord) {
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return consumerRecords;
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return true;
}
};
Expand Down Expand Up @@ -170,15 +166,8 @@ public boolean filter(ConsumerRecord<String, String> consumerRecord) {
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return false;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
Expand Down Expand Up @@ -213,16 +202,10 @@ public boolean filter(ConsumerRecord<String, String> consumerRecord) {

@Override
public List<ConsumerRecord<String, String>> filterBatch(
// SUT
// System Under Test
List<ConsumerRecord<String, String>> consumerRecords) {
return consumerRecords;
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return false;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
Expand All @@ -244,4 +227,5 @@ public boolean ignoreEmptyBatch() {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}

}

0 comments on commit 1db0dd6

Please sign in to comment.