diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index c4f89d8a27..0f07679659 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -80,6 +80,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; @@ -484,8 +485,9 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin == null) { this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (this.kafkaAdmin != null) { - Object producerServers = this.producerFactory.getConfigurationProperties() - .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + String producerServers = this.producerFactory.getConfigurationProperties() + .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString(); + producerServers = removeLeadingAndTrailingBrackets(producerServers); String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); @@ -509,7 +511,6 @@ else if (this.micrometerEnabled) { private String getAdminBootstrapAddress() { // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available String adminServers = this.kafkaAdmin.getBootstrapServers(); - // Fallback to configuration properties if bootstrap servers are not set if (adminServers == null) { adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( @@ -517,8 +518,7 @@ private String getAdminBootstrapAddress() { "" ).toString(); } - - return adminServers; + return removeLeadingAndTrailingBrackets(adminServers); } @Nullable @@ -1003,6 +1003,10 @@ public void destroy() { } } + private static String removeLeadingAndTrailingBrackets(String str) { + return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']'); + } + @SuppressWarnings("serial") private static final class SkipAbortException extends RuntimeException { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 430b3b453c..37f52c6b6e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -68,6 +68,7 @@ import org.springframework.lang.Nullable; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -230,7 +231,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0"); assertThat(admin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString())); // producer factory broker different to admin assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(), "kafkaAdmin"); @@ -386,6 +387,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired .hasMessage("obs5 error"); } + @Test + void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( + @Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate template, + @Autowired KafkaAdmin kafkaAdmin) { + // See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466 + assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); + } + @Configuration @EnableKafka public static class Config { @@ -394,13 +403,16 @@ public static class Config { @Bean KafkaAdmin admin(EmbeddedKafkaBroker broker) { + String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString()); + List brokersAsList = Arrays.asList(brokers); KafkaAdmin admin = new KafkaAdmin( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())); + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList)); admin.setOperationTimeout(42); return admin; } @Bean + @Primary ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," @@ -408,6 +420,13 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaProducerFactory<>(producerProps); } + @Bean + ProducerFactory customProducerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + return new DefaultKafkaProducerFactory<>(producerProps); + } + @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); @@ -439,6 +458,14 @@ KafkaTemplate throwableTemplate(ProducerFactory reuseAdminBeanKafkaTemplate( + @Qualifier("customProducerFactory") ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) {