Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced monitoring when KMLC is batch listener #3047

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K

Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.

Refer to https://docs.micrometer.io/tracing/reference/index.html[Micrometer Tracing] for more information.
IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final MicrometerHolder micrometerHolder;

private final boolean observationEnabled;

private final AtomicBoolean polling = new AtomicBoolean();

private final boolean subBatchPerPartition;
Expand Down Expand Up @@ -912,13 +914,15 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
this.observationEnabled = false;
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
this.batchListener = null;
this.isBatchListener = false;
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down Expand Up @@ -999,7 +1003,7 @@ private Object determineBootstrapServers(Properties consumerProperties) {
@Nullable
private KafkaAdmin obtainAdmin() {
KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
if (customAdmin == null && this.containerProperties.isObservationEnabled()) {
if (customAdmin == null && this.observationEnabled) {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
Expand Down Expand Up @@ -1279,7 +1283,7 @@ private MicrometerHolder obtainMicrometerHolder() {
MicrometerHolder holder = null;
try {
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()
&& !this.containerProperties.isObservationEnabled()) {
&& !this.observationEnabled) {

Function<Object, Map<String, String>> mergedProvider =
cr -> this.containerProperties.getMicrometerTags();
Expand Down
Loading