Skip to content

Commit

Permalink
GH-2609: providing more tracing tags (#3169)
Browse files Browse the repository at this point in the history
Fixes: #2609

* Improved observation tags to follow Opentelemetry standard (Listener)

* Otel conform tags for kafka sender

* Moved partition and offset to high-cardinality keys and revert publish -> send

* author and formatting

* * fix review.
* polish `ObservationTests`.
* add @author.
* add doc in `whats-new.adoc`.

* * fix review.
* add javadoc for `KafkaRecordReceiverContext` construct method.

* * fix review.

---------

Co-authored-by: Christian Mergenthaler <[email protected]>
  • Loading branch information
Wzy19930507 and Christian Mergenthaler authored Apr 8, 2024
1 parent 5ca65cc commit df8b7a1
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,10 @@ See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID pref
`getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)` to filter by ID and container properties.

See xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc#retrieving-message-listener-containers[`@KafkaListener` Lifecycle Management's API Docs] for more information.

[[x32-observation]]
== Enhanced observation by providing more tracing tags

`KafkaTemplateObservation` provides more tracing tags(low cardinality).
`KafkaListenerObservation` provides a new API to find high cardinality key names and more tracing tags(high or low cardinality).
See xref:kafka/micrometer.adoc#observation[Micrometer Observation]
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
* @author Soby Chacko
* @author Wang Zhiyang
* @author Raphael Rösch
* @author Christian Mergenthaler
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -2691,7 +2692,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(),
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package org.springframework.kafka.support.micrometer;

import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import io.micrometer.common.KeyValues;
import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.Observation.Context;
Expand All @@ -26,6 +30,9 @@
* Spring for Apache Kafka Observation for listeners.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
*
* @since 3.0
*
*/
Expand All @@ -42,15 +49,23 @@ public Class<? extends ObservationConvention<? extends Context>> getDefaultConve
}

@Override
@NonNull
public String getPrefix() {
return "spring.kafka.listener";
}

@Override
@NonNull
public KeyName[] getLowCardinalityKeyNames() {
return ListenerLowCardinalityTags.values();
}

@Override
@NonNull
public KeyName[] getHighCardinalityKeyNames() {
return ListenerHighCardinalityTags.values();
}

};

/**
Expand All @@ -64,20 +79,147 @@ public enum ListenerLowCardinalityTags implements KeyName {
LISTENER_ID {

@Override
@NonNull
public String asString() {
return "spring.kafka.listener.id";
}

}
},

/**
* Messaging system.
* @since 3.2
*/
MESSAGING_SYSTEM {

@Override
@NonNull
public String asString() {
return "messaging.system";
}

},

/**
* Messaging operation.
* @since 3.2
*/
MESSAGING_OPERATION {

@Override
@NonNull
public String asString() {
return "messaging.operation";
}

},

/**
* Messaging source name.
* @since 3.2
*/
MESSAGING_SOURCE_NAME {

@Override
@NonNull
public String asString() {
return "messaging.source.name";
}

},

/**
* Messaging source kind.
* @since 3.2
*/
MESSAGING_SOURCE_KIND {

@Override
@NonNull
public String asString() {
return "messaging.source.kind";
}

},

/**
* Messaging the consumer group.
* @since 3.2
*/
MESSAGING_CONSUMER_GROUP {

@Override
@NonNull
public String asString() {
return "messaging.kafka.consumer.group";
}

},

}

/**
* High cardinality tags.
* @since 3.2
*/
public enum ListenerHighCardinalityTags implements KeyName {

/**
* Messaging client id.
*/
MESSAGING_CLIENT_ID {

@Override
@NonNull
public String asString() {
return "messaging.kafka.client_id";
}

},

/**
* Messaging consumer id (consumer group and client id).
*/
MESSAGING_CONSUMER_ID {

@Override
@NonNull
public String asString() {
return "messaging.consumer.id";
}

},

/**
* Messaging partition.
*/
MESSAGING_PARTITION {

@Override
@NonNull
public String asString() {
return "messaging.kafka.source.partition";
}

},

/**
* Messaging message offset.
*/
MESSAGING_OFFSET {

@Override
@NonNull
public String asString() {
return "messaging.kafka.message.offset";
}

},

}

/**
* Default {@link KafkaListenerObservationConvention} for Kafka listener key values.
*
* @author Gary Russell
* @since 3.0
*
*/
public static class DefaultKafkaListenerObservationConvention implements KafkaListenerObservationConvention {

Expand All @@ -89,18 +231,45 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi

@Override
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {
return KeyValues.of(KafkaListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
context.getListenerId());

return KeyValues.of(
ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()),
ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"),
ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"),
ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()),
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"),
ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId())
);
}

@Override
@NonNull
public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) {
String clientId = context.getClientId();
KeyValues keyValues = KeyValues.of(
ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()),
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()),
ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context, clientId))
);

if (StringUtils.hasText(clientId)) {
keyValues = keyValues
.and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(clientId));
}

return keyValues;
}

@Override
public String getContextualName(KafkaRecordReceiverContext context) {
return context.getSource() + " receive";
}

@Override
public String getName() {
return "spring.kafka.listener";
private static String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) {
if (StringUtils.hasText(clientId)) {
return context.getGroupId() + " - " + clientId;
}
return context.getGroupId();
}

}
Expand Down
Loading

0 comments on commit df8b7a1

Please sign in to comment.