diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index 886cbb8f51..7b2bb36502 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -95,7 +95,9 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation. -Refer to https://docs.micrometer.io/tracing/reference/index.html[Micrometer Tracing] for more information. +IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers + +Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information. To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 44e3fb21db..ff26ea06e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -758,6 +758,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final MicrometerHolder micrometerHolder; + private final boolean observationEnabled; + private final AtomicBoolean polling = new AtomicBoolean(); private final boolean subBatchPerPartition; @@ -912,6 +914,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isBatchListener = true; this.wantsFullRecords = this.batchListener.wantsPollResult(); this.pollThreadStateProcessor = setUpPollProcessor(true); + this.observationEnabled = false; } else if (listener instanceof MessageListener) { this.listener = (MessageListener) listener; @@ -919,6 +922,7 @@ else if (listener instanceof MessageListener) { this.isBatchListener = false; this.wantsFullRecords = false; this.pollThreadStateProcessor = setUpPollProcessor(false); + this.observationEnabled = this.containerProperties.isObservationEnabled(); } else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " @@ -999,7 +1003,7 @@ private Object determineBootstrapServers(Properties consumerProperties) { @Nullable private KafkaAdmin obtainAdmin() { KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin(); - if (customAdmin == null && this.containerProperties.isObservationEnabled()) { + if (customAdmin == null && this.observationEnabled) { ApplicationContext applicationContext = getApplicationContext(); if (applicationContext != null) { KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); @@ -1279,7 +1283,7 @@ private MicrometerHolder obtainMicrometerHolder() { MicrometerHolder holder = null; try { if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled() - && !this.containerProperties.isObservationEnabled()) { + && !this.observationEnabled) { Function> mergedProvider = cr -> this.containerProperties.getMicrometerTags();