Skip to content

Commit

Permalink
GH-3466: Optimize KafkaAdmin creation in KafkaTemplate (#3471)
Browse files Browse the repository at this point in the history
* GH-3466: Optimize KafkaAdmin creation in KafkaTemplate

Fixes: #3466

#3466

Improve bootstrap-server config comparison to avoid unnecessary
KafkaAdmin recreation. This addresses inconsistencies between
List<String> and String configurations for bootstrap servers.

The change ensures that List versions of bootstrap-server configs are
converted to regular Strings by removing brackets. This allows for
consistent comparison between producer and admin configurations.

This optimization is particularly relevant for Spring Boot scenarios
where configs may be provided in different formats but represent the
same underlying values.

* Addressing PR review

**Auto-cherry-pick to `3.2.x` & `3.1.x`**
  • Loading branch information
sobychacko authored Aug 30, 2024
1 parent 4a135a1 commit 1d4f7f9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
Expand Down Expand Up @@ -484,8 +485,9 @@ public void afterSingletonsInstantiated() {
if (this.kafkaAdmin == null) {
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString();
producerServers = removeLeadingAndTrailingBrackets(producerServers);
String adminServers = getAdminBootstrapAddress();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
Expand All @@ -509,16 +511,14 @@ else if (this.micrometerEnabled) {
private String getAdminBootstrapAddress() {
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
String adminServers = this.kafkaAdmin.getBootstrapServers();

// Fallback to configuration properties if bootstrap servers are not set
if (adminServers == null) {
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
""
).toString();
}

return adminServers;
return removeLeadingAndTrailingBrackets(adminServers);
}

@Nullable
Expand Down Expand Up @@ -1003,6 +1003,10 @@ public void destroy() {
}
}

private static String removeLeadingAndTrailingBrackets(String str) {
return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']');
}

@SuppressWarnings("serial")
private static final class SkipAbortException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.springframework.lang.Nullable;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.StringUtils;

import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -230,7 +231,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0");

assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString()));
// producer factory broker different to admin
assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(),
"kafkaAdmin");
Expand Down Expand Up @@ -386,6 +387,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
.hasMessage("obs5 error");
}

@Test
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
@Autowired KafkaAdmin kafkaAdmin) {
// See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
}

@Configuration
@EnableKafka
public static class Config {
Expand All @@ -394,20 +403,30 @@ public static class Config {

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString());
List<String> brokersAsList = Arrays.asList(brokers);
KafkaAdmin admin = new KafkaAdmin(
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList));
admin.setOperationTimeout(42);
return admin;
}

@Bean
@Primary
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ProducerFactory<Integer, String> customProducerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
Expand Down Expand Up @@ -439,6 +458,14 @@ KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String
return template;
}

@Bean
KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
@Qualifier("customProducerFactory") ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
return template;
}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
Expand Down

0 comments on commit 1d4f7f9

Please sign in to comment.