Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3001: default clientIds with application name #3048

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,37 @@ These listeners can be used, for example, to create and bind a Micrometer `Kafka

The framework provides listeners that do exactly that; see xref:kafka/micrometer.adoc#micrometer-native[Micrometer Native Metrics].

[[default-client-id-prefixes]]
== Default client ID prefixes

Starting with version 3.2, for Spring Boot applications which define an application name using the `spring.application.name` property, this name is now used
as a default prefix for auto-generated client IDs for these client types:

- consumer clients which don't use a consumer group
- producer clients
- admin clients

This makes it easier to identify these clients at server side for troubleshooting or applying quotas.

.Example client ids resulting for a Spring Boot application with `spring.application.name=myapp`
[%autowidth]
|===
|Client Type |Without application name |With application name

|consumer without consumer group
|consumer-null-1
|myapp-consumer-1

|consumer with consumer group "mygroup"
|consumer-mygroup-1
|consumer-mygroup-1

|producer
|producer-1
|myapp-producer-1

|admin
|adminclient-1
|myapp-admin-1
|===

Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
When this constructor is used, the framework calls the function with the input argument of the current consumer offset position.
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.

[[x32-default-clientid-prefix]]
=== Spring Boot application name as default client ID prefix

For Spring Boot applications which define an application name, this name is now used
as a default prefix for auto-generated client IDs for certain client types.
See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID prefixes] for more details.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,7 +37,11 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -66,9 +71,10 @@
* @author Murali Reddy
* @author Artem Bilan
* @author Chris Gilbert
* @author Adrian Gygax
*/
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
implements ConsumerFactory<K, V>, BeanNameAware {
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));

Expand All @@ -86,6 +92,8 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory

private boolean configureDeserializers = true;

private ApplicationContext applicationContext;

/**
* Construct a factory with the provided configuration.
* @param configs the configuration.
Expand Down Expand Up @@ -371,6 +379,22 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
if (clientIdSuffix == null) {
clientIdSuffix = "";
}

final boolean hasGroupIdOrClientIdInProperties = properties != null
&& (properties.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG));
final boolean hasGroupIdOrClientIdInConfig = this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
|| this.configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG);
if (!overrideClientIdPrefix && groupId == null && !hasGroupIdOrClientIdInProperties && !hasGroupIdOrClientIdInConfig) {
final String applicationName = Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.orElse(null);
if (applicationName != null) {
clientIdPrefix = applicationName + "-consumer";
overrideClientIdPrefix = true;
}
}

boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
if (groupId == null
Expand Down Expand Up @@ -469,6 +493,11 @@ public boolean isAutoCommit() {
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -110,6 +112,7 @@
* @author Artem Bilan
* @author Chris Gilbert
* @author Thomas Strauß
* @author Adrian Gygax
*/
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
implements ProducerFactory<K, V>, ApplicationContextAware,
Expand Down Expand Up @@ -981,9 +984,22 @@ public void closeThreadBoundProducer() {
protected Map<String, Object> getProducerConfigs() {
final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
checkBootstrap(newProducerConfigs);

final String prefix;
if (this.clientIdPrefix != null) {
prefix = this.clientIdPrefix;
}
else {
prefix = Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.map(applicationName -> applicationName + "-producer")
.orElse(null);
}

if (prefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
prefix + "-" + this.clientIdCounter.incrementAndGet());
}
return newProducerConfigs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.AlterConfigsResult;
Expand All @@ -59,6 +60,7 @@
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.TopicForRetryable;
Expand All @@ -71,6 +73,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Adrian Gygax
*
* @since 1.3
*/
Expand All @@ -86,6 +89,8 @@ public class KafkaAdmin extends KafkaResourceFactory

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));

private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();

private final Map<String, Object> configs;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -374,9 +379,23 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}

AdminClient createAdmin() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
return AdminClient.create(getAdminConfig());
}

protected Map<String, Object> getAdminConfig() {
final Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
return AdminClient.create(configs2);

if (!configs2.containsKey(AdminClientConfig.CLIENT_ID_CONFIG)) {
Optional.ofNullable(this.applicationContext)
.map(EnvironmentCapable::getEnvironment)
.map(environment -> environment.getProperty("spring.application.name"))
.ifPresent(applicationName -> configs2.put(
AdminClientConfig.CLIENT_ID_CONFIG,
applicationName + "-admin-" + CLIENT_ID_COUNTER.getAndIncrement())
);
}
return configs2;
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -43,7 +44,9 @@

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.ConsumerFactory.Listener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
Expand All @@ -60,6 +63,7 @@
* @author Gary Russell
* @author Chris Gilbert
* @author Artem Bilan
* @author Adrian Gygax
*
* @since 1.0.6
*/
Expand Down Expand Up @@ -120,6 +124,7 @@ protected Consumer<String, String> createRawConsumer(Map<String, Object> configP
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.setBootstrapServersSupplier(() -> "foo");
target.createConsumer(null, null, null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
Expand All @@ -143,6 +148,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, null, overrides);
assertThat(configPassedToKafkaConsumer.get("config1")).isEqualTo("overridden");
assertThat(configPassedToKafkaConsumer.get("config2")).isSameAs(originalConfig.get("config2"));
Expand All @@ -165,6 +171,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("original-1");
}
Expand Down Expand Up @@ -198,6 +205,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
}
Expand All @@ -214,6 +222,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", null, null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
}
Expand All @@ -231,6 +240,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
}
Expand All @@ -250,10 +260,27 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, "overridden", "-1", overrides);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
}

@Test
public void testApplicationNameIfNoGroupIdAsClientIdWhenCreatingConsumer() {
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
DefaultKafkaConsumerFactory<String, String> target =
new DefaultKafkaConsumerFactory<String, String>(Map.of()) {

@Override
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
configPassedToKafkaConsumer.putAll(configProps);
return null;
}
};
target.setApplicationContext(createApplicationContextWithApplicationName());
target.createConsumer(null, null, "-1", null);
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("appname-consumer-1");
}

@Test
public void testOverriddenGroupIdWhenCreatingConsumer() {
Expand Down Expand Up @@ -476,6 +503,14 @@ void configDeserializer() {
verify(value).configure(config, false);
}

private static ApplicationContext createApplicationContextWithApplicationName() {
final Environment environment = mock(Environment.class);
given(environment.getProperty("spring.application.name")).willReturn("appname");
final ApplicationContext applicationContext = mock(ApplicationContext.class);
given(applicationContext.getEnvironment()).willReturn(environment);
return applicationContext;
}

@Configuration
public static class Config {

Expand Down
Loading
Loading