Skip to content

Commit

Permalink
Batch listener does not support observation (#3047)
Browse files Browse the repository at this point in the history
Fixes: #3047 

* because of observation does not support batch listener, add properties `observationEnabled` and replace 
  `this.containerProperties.isObservationEnabled()` for `KafkaMessageListenerContainer.ListenerConsumer`, when
  batch listener set false to `observationEnabled`.
* add notice in micrometer.adoc
* add observationEnabled and replace containerProperties.isObservationEnabled() in 
  KafkaMessageListenerContainer.ListenerConsumer

**Auto-cherry-pick to `3.1.x`**
  • Loading branch information
Wzy19930507 authored Feb 21, 2024
1 parent 7a53b01 commit 6f32959
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K

Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.

Refer to https://docs.micrometer.io/tracing/reference/index.html[Micrometer Tracing] for more information.
IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final MicrometerHolder micrometerHolder;

private final boolean observationEnabled;

private final AtomicBoolean polling = new AtomicBoolean();

private final boolean subBatchPerPartition;
Expand Down Expand Up @@ -912,13 +914,15 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
this.observationEnabled = false;
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
this.batchListener = null;
this.isBatchListener = false;
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down Expand Up @@ -999,7 +1003,7 @@ private Object determineBootstrapServers(Properties consumerProperties) {
@Nullable
private KafkaAdmin obtainAdmin() {
KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
if (customAdmin == null && this.containerProperties.isObservationEnabled()) {
if (customAdmin == null && this.observationEnabled) {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
Expand Down Expand Up @@ -1279,7 +1283,7 @@ private MicrometerHolder obtainMicrometerHolder() {
MicrometerHolder holder = null;
try {
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()
&& !this.containerProperties.isObservationEnabled()) {
&& !this.observationEnabled) {

Function<Object, Map<String, String>> mergedProvider =
cr -> this.containerProperties.getMicrometerTags();
Expand Down

0 comments on commit 6f32959

Please sign in to comment.