Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2609: providing more tracing tags #3169

Merged
merged 7 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,10 @@ See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID pref
`getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)` to filter by ID and container properties.

See xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc#retrieving-message-listener-containers[`@KafkaListener` Lifecycle Management's API Docs] for more information.

[[x32-observation]]
== Enhanced observation by providing more tracing tags

`KafkaTemplateObservation` provides more tracing tags(low cardinality).
`KafkaListenerObservation` provides a new API to find high cardinality key names and more tracing tags(high or low cardinality).
See xref:kafka/micrometer.adoc#observation[Micrometer Observation]
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
* @author Soby Chacko
* @author Wang Zhiyang
* @author Raphael Rösch
* @author Christian Mergenthaler
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -2687,7 +2688,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(),
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.springframework.kafka.support.micrometer;

import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;

import io.micrometer.common.KeyValues;
import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.Observation.Context;
Expand All @@ -26,6 +29,9 @@
* Spring for Apache Kafka Observation for listeners.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.0
*
*/
Expand All @@ -42,19 +48,32 @@ public Class<? extends ObservationConvention<? extends Context>> getDefaultConve
}

@Override
@NonNull
public String getPrefix() {
return "spring.kafka.listener";
}

@Override
@NonNull
public KeyName[] getLowCardinalityKeyNames() {
return ListenerLowCardinalityTags.values();
}

@Override
@NonNull
public KeyName[] getHighCardinalityKeyNames() {
return ListenerHighCardinalityTags.values();
}

};

/**
* Low cardinality tags.
*
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.2
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum ListenerLowCardinalityTags implements KeyName {

Expand All @@ -64,18 +83,151 @@ public enum ListenerLowCardinalityTags implements KeyName {
LISTENER_ID {

@Override
@NonNull
public String asString() {
return "spring.kafka.listener.id";
}

}
},

/**
* Messaging system.
*/
MESSAGING_SYSTEM {

@Override
@NonNull
public String asString() {
return "messaging.system";
}

},

/**
* Messaging operation.
*/
MESSAGING_OPERATION {

@Override
@NonNull
public String asString() {
return "messaging.operation";
}

},

/**
* Messaging source name.
*/
MESSAGING_SOURCE_NAME {

@Override
@NonNull
public String asString() {
return "messaging.source.name";
}

},

/**
* Messaging source kind.
*/
MESSAGING_SOURCE_KIND {

@Override
@NonNull
public String asString() {
return "messaging.source.kind";
}

},

/**
* Messaging the consumer group.
*/
MESSAGING_CONSUMER_GROUP {
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved

@Override
@NonNull
public String asString() {
return "messaging.kafka.consumer.group";
}

},

}

/**
* High cardinality tags.
*
* @author Wang Zhiyang
* @author Christian Mergenthaler
*
* @since 3.2
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum ListenerHighCardinalityTags implements KeyName {

/**
* Messaging client id.
*/
MESSAGING_CLIENT_ID {

@Override
@NonNull
public String asString() {
return "messaging.kafka.client_id";
}

},

/**
* Messaging consumer id (consumer group and client id).
*/
MESSAGING_CONSUMER_ID {

@Override
@NonNull
public String asString() {
return "messaging.consumer.id";
}

},

/**
* Messaging partition.
*/
MESSAGING_PARTITION {

@Override
@NonNull
public String asString() {
return "messaging.kafka.source.partition";
}

},

/**
* Messaging message offset.
*/
MESSAGING_OFFSET {

@Override
@NonNull
public String asString() {
return "messaging.kafka.message.offset";
}

},

}

/**
* Default {@link KafkaListenerObservationConvention} for Kafka listener key values.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.0
*
*/
Expand All @@ -89,18 +241,44 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi

@Override
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {
return KeyValues.of(KafkaListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
context.getListenerId());

return KeyValues.of(
ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()),
ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"),
ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"),
ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()),
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"),
ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId())
);
}

@Override
@NonNull
public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) {
KeyValues keyValues = KeyValues.of(
ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()),
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset())
);

if (StringUtils.hasText(context.getClientId())) {
keyValues = keyValues
.and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(context.getClientId()))
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
.and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context)));
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

return keyValues;
}

@Override
public String getContextualName(KafkaRecordReceiverContext context) {
return context.getSource() + " receive";
}

@Override
public String getName() {
return "spring.kafka.listener";
private String getConsumerId(KafkaRecordReceiverContext context) {
if (StringUtils.hasText(context.getClientId())) {
return context.getGroupId() + " - " + context.getClientId();
}
return context.getGroupId();
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,16 +28,24 @@
* {@link ReceiverContext} for {@link ConsumerRecord}s.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.0
*
*/
public class KafkaRecordReceiverContext extends ReceiverContext<ConsumerRecord<?, ?>> {

private final String listenerId;

private final String clientId;

private final String groupId;

private final ConsumerRecord<?, ?> record;

public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId, Supplier<String> clusterId) {
public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId, String clientId, String groupId,
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
Supplier<String> clusterId) {
super((carrier, key) -> {
Header header = carrier.headers().lastHeader(key);
if (header == null || header.value() == null) {
Expand All @@ -48,6 +56,8 @@ public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId
setCarrier(record);
this.record = record;
this.listenerId = listenerId;
this.clientId = clientId;
this.groupId = groupId;
String cluster = clusterId.get();
setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : ""));
}
Expand All @@ -60,6 +70,14 @@ public String getListenerId() {
return this.listenerId;
}

public String getGroupId() {
return this.groupId;
}

public String getClientId() {
return this.clientId;
Wzy19930507 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Return the source topic.
* @return the source.
Expand All @@ -70,11 +88,29 @@ public String getSource() {

/**
* Return the consumer record.
* @return the record the record.
* @return the record.
* @since 3.0.6
*/
public ConsumerRecord<?, ?> getRecord() {
return this.record;
}

/**
* Return the partition.
* @return the partition.
* @since 3.2
*/
public String getPartition() {
return Integer.toString(this.record.partition());
}

/**
* Return the offset.
* @return the offset.
* @since 3.2
*/
public String getOffset() {
return Long.toString(this.record.offset());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,9 @@
* {@link SenderContext} for {@link ProducerRecord}s.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.0
*
*/
Expand Down Expand Up @@ -64,7 +67,7 @@ public String getDestination() {

/**
* Return the producer record.
* @return the record the record.
* @return the record.
* @since 3.0.6
*/
public ProducerRecord<?, ?> getRecord() {
Expand Down
Loading
Loading