Skip to content

Commit

Permalink
GH-3001: default clientIds with application name
Browse files Browse the repository at this point in the history
Fixes: #GH-3001

Use Spring Boot's `spring.application.name` property as part of the default clientIds for Consumers, Producers and AdminClients. Helps with identifying problematic clients at server side.

* Only use as a fallback if clientId wasn't specified explicitly
* Do not use for Consumers with a specified groupId because KafkaConsumer will use the groupId as clientId which already is an identifiable default
  • Loading branch information
notizklotz committed Feb 19, 2024
1 parent 22764cb commit ba38fc7
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,7 +37,11 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -68,7 +73,7 @@
* @author Chris Gilbert
*/
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
implements ConsumerFactory<K, V>, BeanNameAware {
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));

Expand All @@ -86,6 +91,8 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory

private boolean configureDeserializers = true;

private ApplicationContext applicationContext;

/**
* Construct a factory with the provided configuration.
* @param configs the configuration.
Expand Down Expand Up @@ -371,6 +378,22 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
if (clientIdSuffix == null) {
clientIdSuffix = "";
}

final boolean hasGroupIdOrClientIdInProperties = properties != null
&& (properties.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG));
final boolean hasGroupIdOrClientIdInConfig = this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
|| this.configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG);
if (!overrideClientIdPrefix && groupId == null && !hasGroupIdOrClientIdInProperties && !hasGroupIdOrClientIdInConfig) {
final String applicationName = Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.orElse(null);
if (applicationName != null) {
clientIdPrefix = applicationName + "-consumer";
overrideClientIdPrefix = true;
}
}

boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
if (groupId == null
Expand Down Expand Up @@ -469,6 +492,11 @@ public boolean isAutoCommit() {
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -981,9 +983,21 @@ public void closeThreadBoundProducer() {
protected Map<String, Object> getProducerConfigs() {
final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
checkBootstrap(newProducerConfigs);

final String prefix;
if (this.clientIdPrefix != null) {
prefix = this.clientIdPrefix;
} else {
prefix = Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.map(applicationName -> applicationName + "-producer")
.orElse(null);
}

if (prefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
prefix + "-" + this.clientIdCounter.incrementAndGet());
}
return newProducerConfigs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.AlterConfigsResult;
Expand All @@ -55,10 +56,12 @@
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.TopicForRetryable;
Expand Down Expand Up @@ -86,6 +89,8 @@ public class KafkaAdmin extends KafkaResourceFactory

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));

private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();

private final Map<String, Object> configs;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -374,9 +379,23 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}

AdminClient createAdmin() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
return AdminClient.create(getAdminConfig());
}

protected Map<String, Object> getAdminConfig() {
final Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
return AdminClient.create(configs2);

if (!configs2.containsKey(AdminClientConfig.CLIENT_ID_CONFIG)) {
Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.ifPresent(applicationName -> configs2.put(
AdminClientConfig.CLIENT_ID_CONFIG,
applicationName + "-admin-" + CLIENT_ID_COUNTER.getAndIncrement())
);
}
return configs2;
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.when;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand All @@ -43,7 +43,9 @@

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.ConsumerFactory.Listener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
Expand Down Expand Up @@ -120,6 +122,7 @@ protected Consumer<String, String> createRawConsumer(Map<String, Object> configP
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.setBootstrapServersSupplier(() -> "foo");
target.createConsumer(null, null, null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
Expand All @@ -143,6 +146,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, null, overrides);
assertThat(configPassedToKafkaConsumer.get("config1")).isEqualTo("overridden");
assertThat(configPassedToKafkaConsumer.get("config2")).isSameAs(originalConfig.get("config2"));
Expand All @@ -165,6 +169,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("original-1");
}
Expand Down Expand Up @@ -198,6 +203,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
}
Expand All @@ -214,6 +220,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
}
Expand All @@ -231,6 +238,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
}
Expand All @@ -250,10 +258,27 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", "-1", overrides);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
}

@Test
public void testApplicationNameIfNoGroupIdAsClientIdWhenCreatingConsumer() {
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
DefaultKafkaConsumerFactory<String, String> target =
new DefaultKafkaConsumerFactory<String, String>(Map.of()) {

@Override
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
configPassedToKafkaConsumer.putAll(configProps);
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("appname-consumer-1");
}

@Test
public void testOverriddenGroupIdWhenCreatingConsumer() {
Expand Down Expand Up @@ -476,6 +501,14 @@ void configDeserializer() {
verify(value).configure(config, false);
}

private static ApplicationContext createApplicationContextWithApplicationName() {
final Environment environment = mock(Environment.class);
when(environment.getProperty("spring.application.name")).thenReturn("appname");
final ApplicationContext applicationContext = mock(ApplicationContext.class);
when(applicationContext.getEnvironment()).thenReturn(environment);
return applicationContext;
}

@Configuration
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -53,8 +50,10 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.mockito.Mockito;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.ProducerFactory.Listener;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
Expand Down Expand Up @@ -778,4 +777,26 @@ protected Producer<String, String> createRawProducer(Map<String, Object> rawConf
assertThat(producerConfigs).containsEntry("linger.ms", 200);
}

@Test
void testDefaultClientIdPrefixIsSpringBootApplicationName() {
final DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(Map.of());
final Environment environment = mock(Environment.class);
when(environment.getProperty("spring.application.name")).thenReturn("appname");
final ApplicationContext applicationContext = mock(ApplicationContext.class);
when(applicationContext.getEnvironment()).thenReturn(environment);
pf.setApplicationContext(applicationContext);
assertThat(pf.getProducerConfigs()).containsEntry(ProducerConfig.CLIENT_ID_CONFIG, "appname-producer-1");
}

@Test
void testExplicitClientIdPrefixOverridesDefault() {
final DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(Map.of(ProducerConfig.CLIENT_ID_CONFIG, "clientId"));
final Environment environment = mock(Environment.class);
when(environment.getProperty("spring.application.name")).thenReturn("appname");
final ApplicationContext applicationContext = mock(ApplicationContext.class);
when(applicationContext.getEnvironment()).thenReturn(environment);
pf.setApplicationContext(applicationContext);
assertThat(pf.getProducerConfigs()).containsEntry(ProducerConfig.CLIENT_ID_CONFIG, "clientId-1");
}

}
Loading

0 comments on commit ba38fc7

Please sign in to comment.