Skip to content

Commit

Permalink
GH-2762: Fix Topic Check with Consumer Overrides
Browse files Browse the repository at this point in the history
* GH-2762: Fix Topic Check with Consumer Overrides

Resolves #2762

When checking for missing topics, the admin was created with the raw
consumer factory properties and did not apply any overriding consumer
properties in the `ContainerProperties`.

Apply the overrides before creating the admin.

Pull up a method that merges default properties (if any) because we
have to iterate over the hash table because the user might have used
`put()` instead of `setProperty()` to set properties.

**cherry-pick to 2.9.x**

* Fix unrelated test.

* Fix test class name.

* Fix test class name.
  • Loading branch information
garyrussell authored Aug 3, 2023
1 parent f76bc09 commit 08e76d3
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -565,6 +566,12 @@ protected void checkTopics() {
.stream()
.filter(entry -> AdminClientConfig.configNames().contains(entry.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Properties overrides = propertiesFromConsumerPropertyOverrides();
overrides.forEach((key, value) -> {
if (key instanceof String) {
configs.put((String) key, value);
}
});
List<String> missing = null;
try (AdminClient client = AdminClient.create(configs)) { // NOSONAR - false positive null check
if (client != null) {
Expand Down Expand Up @@ -740,4 +747,23 @@ protected void publishContainerStoppedEvent() {
return this;
}

/**
* Make any default consumer override properties explicit properties.
* @return the properties.
* @since 2.9.11
*/
protected Properties propertiesFromConsumerPropertyOverrides() {
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
Properties props = new Properties();
props.putAll(propertyOverrides);
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
// User might have provided properties as defaults
stringPropertyNames.forEach((name) -> {
if (!props.contains(name)) {
props.setProperty(name, propertyOverrides.getProperty(name));
}
});
return props;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
ObservationRegistry observationRegistry) {

this.observationRegistry = observationRegistry;
Properties consumerProperties = propertiesFromProperties();
Properties consumerProperties = propertiesFromConsumerPropertyOverrides();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
Expand Down Expand Up @@ -1048,20 +1048,6 @@ private CommonErrorHandler determineCommonErrorHandler() {
}
}

private Properties propertiesFromProperties() {
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
Properties props = new Properties();
props.putAll(propertyOverrides);
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
// User might have provided properties as defaults
stringPropertyNames.forEach((name) -> {
if (!props.contains(name)) {
props.setProperty(name, propertyOverrides.getProperty(name));
}
});
return props;
}

String getClientId() {
return this.clientId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,17 @@ public static class Listener {

private final String topic;

private final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch1 = new CountDownLatch(1);

private final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);

private final KafkaListenerContainerFactory<?> cf;

private List<Foo> received;
volatile List<Foo> received;

private List<String> receivedTopics;
volatile List<String> receivedTopics;

private List<Integer> receivedPartitions;
volatile List<Integer> receivedPartitions;

public Listener(String topic, KafkaListenerContainerFactory<?> cf) {
this.topic = topic;
Expand Down Expand Up @@ -302,9 +302,9 @@ public String getTopic() {

public static class Listener3 {

private final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch1 = new CountDownLatch(1);

private List<Message<Foo>> received;
volatile List<Message<Foo>> received;

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> foos) {
Expand All @@ -318,11 +318,11 @@ public void listen1(List<Message<Foo>> foos) {

public static class Listener4 {

private final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch1 = new CountDownLatch(1);

private List<Foo> received;
volatile List<Foo> received;

private List<Foo> replies;
volatile List<Foo> replies;

@KafkaListener(topics = "blc4", groupId = "blc4")
@SendTo
Expand Down Expand Up @@ -351,7 +351,7 @@ public static class Listener5 {

final CountDownLatch latch2 = new CountDownLatch(1);

final List<Foo> received = new ArrayList<>();
final List<Foo> received = Collections.synchronizedList(new ArrayList<>());

volatile String dlt;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Test;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

/**
* @author Gary Russell
* @since 3.0
*
*/
@EmbeddedKafka(topics = "mtccac")
public class MissingTopicCheckOverrideAdminConfigTests {

@Test
void configOverride(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties props = new ContainerProperties("mtccac");
props.setMissingTopicsFatal(true);
props.getKafkaConsumerProperties().setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
broker.getBrokersAsString());
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, props) {

@Override
public void checkTopics() {
super.checkTopics();
}

};
LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass())));
new DirectFieldAccessor(container).setPropertyValue("logger", logger);
assertThatNoException().isThrownBy(() -> container.checkTopics());
verify(logger, never()).error(any(), anyString());
}

@Test
void configOverrideDefault(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties props = new ContainerProperties("mtccac");
props.setMissingTopicsFatal(true);
/*
* Ensure this works if there are property defaults.
* We have to iterate over the hash table because the user might have
* used put() instead of setProperty().
*/
Properties defaultProperties = new Properties();
defaultProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
Properties properties = new Properties(defaultProperties);
props.setKafkaConsumerProperties(properties);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, props) {

@Override
public void checkTopics() {
super.checkTopics();
}

};
LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass())));
new DirectFieldAccessor(container).setPropertyValue("logger", logger);
assertThatNoException().isThrownBy(() -> container.checkTopics());
verify(logger, never()).error(any(), anyString());
}

}

0 comments on commit 08e76d3

Please sign in to comment.