From 4a3b175b696354d03226be2a6d23b2da948c6b21 Mon Sep 17 00:00:00 2001 From: Yaniv Nahoum Date: Tue, 23 Jul 2024 19:38:05 +0300 Subject: [PATCH] GH-3375: Override close() in ExtendedKafkaConsumer Fixes: #3375 * ExtendedKafkaConsumer now overrides `close()` to make sure `listener.consumerRemoved()` is called when the consumer is closed. (cherry picked from commit 1f93c6a6ab726d6169e3f207cb8f1fd9e666d73d) --- .../kafka/core/DefaultKafkaConsumerFactory.java | 10 ++++++++++ .../core/DefaultKafkaConsumerFactoryTests.java | 17 +++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index c44e1055c9..7b332d1d44 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -72,6 +72,7 @@ * @author Artem Bilan * @author Chris Gilbert * @author Adrian Gygax + * @author Yaniv Nahoum */ public class DefaultKafkaConsumerFactory extends KafkaResourceFactory implements ConsumerFactory, BeanNameAware, ApplicationContextAware { @@ -519,10 +520,19 @@ protected ExtendedKafkaConsumer(Map configProps) { } } + @Override + public void close() { + super.close(); + notifyConsumerRemoved(); + } + @Override public void close(Duration timeout) { super.close(timeout); + notifyConsumerRemoved(); + } + private void notifyConsumerRemoved() { for (Listener listener : DefaultKafkaConsumerFactory.this.listeners) { listener.consumerRemoved(this.idForListeners, this); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index fbf9e79616..a8463ea3c2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -42,6 +42,8 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -66,6 +68,7 @@ * @author Artem Bilan * @author Adrian Gygax * @author Soby Chacko + * @author Yaniv Nahoum * * @since 1.0.6 */ @@ -458,8 +461,9 @@ public void testNestedTxProducerIsFixed() throws Exception { } @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - void listener() { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void listener(boolean closeWithTimeout) { Map consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig); @@ -484,8 +488,13 @@ public void consumerRemoved(String id, Consumer consumer) { Consumer consumer = cf.createConsumer(); assertThat(adds).hasSize(1); assertThat(adds.get(0)).isEqualTo("cf.foo-0"); - assertThat(removals).hasSize(0); - consumer.close(Duration.ofSeconds(10)); + assertThat(removals).isEmpty(); + if (closeWithTimeout) { + consumer.close(Duration.ofSeconds(10)); + } + else { + consumer.close(); + } assertThat(removals).hasSize(1); }