From 3cd6c9cc3f574e8d4f872d3530788f3ee1c29c6f Mon Sep 17 00:00:00 2001 From: cfredri4 <50839054+cfredri4@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:39:03 +0100 Subject: [PATCH] Allow configuring observation registry directly This changes allows configuring the observation registry directly, instead of it being fetched from the application context. This is to allow observability when `KafkaTemplate/KafkaMessageListenerContainer` are used without an application context. --- .../kafka/core/KafkaTemplate.java | 17 ++++++++++++++-- .../kafka/listener/ContainerProperties.java | 20 +++++++++++++++++++ .../KafkaMessageListenerContainer.java | 19 +++++++++--------- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 9e4c267503..14caa930db 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -103,6 +103,7 @@ * @author Soby Chacko * @author Gurps Bassi * @author Valentina Armenise + * @author Christian Fredriksson */ public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean, SmartInitializingSingleton { @@ -456,6 +457,16 @@ public void setObservationConvention(KafkaTemplateObservationConvention observat this.observationConvention = observationConvention; } + /** + * Configure the {@link ObservationRegistry} to use for recording observations. + * @param observationRegistry the observation registry to use. + * @since 3.3.1 + */ + public void setObservationRegistry(ObservationRegistry observationRegistry) { + Assert.notNull(observationRegistry, "'observationRegistry' must not be null"); + this.observationRegistry = observationRegistry; + } + /** * Return the {@link KafkaAdmin}, used to find the cluster id for observation, if * present. @@ -479,8 +490,10 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { @Override public void afterSingletonsInstantiated() { if (this.observationEnabled && this.applicationContext != null) { - this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class) - .getIfUnique(() -> this.observationRegistry); + if (this.observationRegistry.isNoop()) { + this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class) + .getIfUnique(() -> this.observationRegistry); + } if (this.kafkaAdmin == null) { this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (this.kafkaAdmin != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 2b136863f1..221e70204a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -26,6 +26,7 @@ import java.util.function.Function; import java.util.regex.Pattern; +import io.micrometer.observation.ObservationRegistry; import org.aopalliance.aop.Advice; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -281,6 +282,8 @@ public enum EOSMode { private boolean observationEnabled; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT; private Boolean subBatchPerPartition; @@ -716,6 +719,20 @@ public void setObservationEnabled(boolean observationEnabled) { this.observationEnabled = observationEnabled; } + public ObservationRegistry getObservationRegistry() { + return this.observationRegistry; + } + + /** + * Configure the {@link ObservationRegistry} to use for recording observations. + * @param observationRegistry the observation registry to use. + * @since 3.3.1 + */ + public void setObservationRegistry(ObservationRegistry observationRegistry) { + Assert.notNull(observationRegistry, "'observationRegistry' must not be null"); + this.observationRegistry = observationRegistry; + } + /** * Set additional tags for the Micrometer listener timers. * @param tags the tags. @@ -1118,6 +1135,9 @@ public String toString() { + (this.observationConvention != null ? "\n observationConvention=" + this.observationConvention : "") + + (this.observationRegistry != null + ? "\n observationRegistry=" + this.observationRegistry + : "") + "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions + "\n]"; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2d38945c22..de258bac48 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -78,7 +78,6 @@ import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; @@ -171,6 +170,7 @@ * @author Borahm Lee * @author Lokesh Alamuri * @author Sanghyeok An + * @author Christian Fredriksson */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -372,14 +372,15 @@ protected void doStart() { } GenericMessageListener listener = (GenericMessageListener) messageListener; ListenerType listenerType = determineListenerType(listener); - ObservationRegistry observationRegistry = ObservationRegistry.NOOP; - ApplicationContext applicationContext = getApplicationContext(); - if (applicationContext != null && containerProperties.isObservationEnabled()) { - ObjectProvider registry = - applicationContext.getBeanProvider(ObservationRegistry.class); - ObservationRegistry reg = registry.getIfUnique(); - if (reg != null) { - observationRegistry = reg; + ObservationRegistry observationRegistry = containerProperties.getObservationRegistry(); + if (observationRegistry.isNoop()) { + ApplicationContext applicationContext = getApplicationContext(); + if (applicationContext != null && containerProperties.isObservationEnabled()) { + ObservationRegistry reg = applicationContext.getBeanProvider(ObservationRegistry.class) + .getIfUnique(); + if (reg != null) { + observationRegistry = reg; + } } } this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);