Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216

Closed
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,62 @@ public void listen(Thing thing) {
}
----

Starting with version 3.3, Ignoring empty batches that result from filtering by `RecordFilterStrategy` is supported.
When implementing `RecordFilterStrategy`, it can be configured through `ignoreEmptyBatch()`.
The default setting is `false`, indicating `KafkaListener` will be invoked even if all `ConsumerRecord` s are filtered out.

If `true` is returned, the `KafkaListener` [underline]#will not be invoked# when all `ConsumerRecord` are filtered out.
However, commit to broker, will still be executed. +
If `false` is returned, the `KafkaListener` [underline]#will be invoked# when all `ConsumerRecord` are filtered out.

Here are some examples.

[source,java]
----
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return true;
}
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
----
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`.
Thus `KafkaListener#listen(...)` never will be invoked at all.

[source,java]
----
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return false;
}
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
----
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
Thus `KafkaListener#listen(...)` always will be invoked.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` inter
This method allows for more selective seek operations by targeting only the desired consumer group.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].

[[x33-new-option-ignore-empty-batch]]
=== Configurable Handling of Empty Batches in Kafka Listener with RecordFilterStrategy

`RecordFilterStrategy` now supports ignoring empty batches that result from filtering.
This can be configured through default method `ignoreEmptyBatch()`, which defaults to false, ensuring `KafkaListener` is invoked even if all `ConsumerRecords` are filtered out.

For more details, see xref:kafka/receiving-messages/filtering.adoc[Message receive filtering Docs].
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Sanghyeok An
*
*/
public class FilteringBatchMessageListenerAdapter<K, V>
Expand All @@ -44,16 +45,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -71,22 +72,25 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) ||
this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {

if (recordFilterStrategy.ignoreEmptyBatch() &&
consumerRecords.isEmpty() &&
acknowledgment != null) {
acknowledgment.acknowledge();
}
else if (consumerRecords.size() > 0 || this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.BatchMessageListener;

/**
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
Expand All @@ -30,7 +32,7 @@
* @param <V> the value type.
*
* @author Gary Russell
*
* @author Sanghyeok An
*/
public interface RecordFilterStrategy<K, V> {

Expand Down Expand Up @@ -58,4 +60,16 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
return records;
}

/**
* Determine whether {@link FilteringBatchMessageListenerAdapter} should invoke
* the {@link BatchMessageListener} when all {@link ConsumerRecord}s in a batch have been filtered out
* resulting in empty list. By default, do invoke the {@link BatchMessageListener} (return false).
* @return true for {@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}
-> {@link FilteringBatchMessageListenerAdapter} to not invoke {@link BatchMessageListener}

not invoke is correct doc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: 5c34122.
Thank you!

* when all {@link ConsumerRecord} in a batch filtered out
* @since 3.3
*/
default boolean ignoreEmptyBatch() {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
Expand All @@ -29,13 +30,15 @@
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

/**
* @author Gary Russell
* @author Sanghyeok An
* @since 2.0
*
*/
Expand Down Expand Up @@ -72,4 +75,157 @@ public void testBatchFilterAckDiscard() throws Exception {
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class));
}

@Test
public void listener_should_not_be_invoked_on_emptyList_and_ignoreEmptyBatch_true() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return true;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();
final Acknowledgment ack = mock(Acknowledgment.class);

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
verify(ack, only()).acknowledge();
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class), any(KafkaConsumer.class));
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class));
verify(listener, never()).onMessage(any(List.class), any(KafkaConsumer.class));
verify(listener, never()).onMessage(any(List.class));
}

@Test
public void listener_should_be_invoked_on_notEmptyList_and_ignoreEmptyBatch_true() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return consumerRecords;
}

@Override
public boolean ignoreEmptyBatch() {
return true;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = List.of(new ConsumerRecord<>("hello-topic", 1, 1, "hello-key", "hello-value"));
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}

@Test
public void listener_should_be_invoked_on_emptyList_and_ignoreEmptyBatch_false() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}

@Test
public void listener_should_be_invoked_on_notEmptyList_and_ignoreEmptyBatch_false() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {

return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
// System Under Test
List<ConsumerRecord<String, String>> consumerRecords) {
return consumerRecords;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = List.of(new ConsumerRecord<>("hello-topic", 1, 1, "hello-key", "hello-value"));
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}
artembilan marked this conversation as resolved.
Show resolved Hide resolved

}