Skip to content

Commit

Permalink
GH-3375: Override close() in ExtendedKafkaConsumer
Browse files Browse the repository at this point in the history
Fixes: #3375

* ExtendedKafkaConsumer now overrides `close()` to make sure `listener.consumerRemoved()` is called when the consumer is closed.

(cherry picked from commit 1f93c6a)
  • Loading branch information
yanivnahoum authored and spring-builds committed Jul 23, 2024
1 parent ef85fa7 commit 4a3b175
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* @author Artem Bilan
* @author Chris Gilbert
* @author Adrian Gygax
* @author Yaniv Nahoum
*/
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {
Expand Down Expand Up @@ -519,10 +520,19 @@ protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
}
}

@Override
public void close() {
super.close();
notifyConsumerRemoved();
}

@Override
public void close(Duration timeout) {
super.close(timeout);
notifyConsumerRemoved();
}

private void notifyConsumerRemoved() {
for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
listener.consumerRemoved(this.idForListeners, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -66,6 +68,7 @@
* @author Artem Bilan
* @author Adrian Gygax
* @author Soby Chacko
* @author Yaniv Nahoum
*
* @since 1.0.6
*/
Expand Down Expand Up @@ -458,8 +461,9 @@ public void testNestedTxProducerIsFixed() throws Exception {
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void listener() {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void listener(boolean closeWithTimeout) {
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
Expand All @@ -484,8 +488,13 @@ public void consumerRemoved(String id, Consumer consumer) {
Consumer consumer = cf.createConsumer();
assertThat(adds).hasSize(1);
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
assertThat(removals).hasSize(0);
consumer.close(Duration.ofSeconds(10));
assertThat(removals).isEmpty();
if (closeWithTimeout) {
consumer.close(Duration.ofSeconds(10));
}
else {
consumer.close();
}
assertThat(removals).hasSize(1);
}

Expand Down

0 comments on commit 4a3b175

Please sign in to comment.