From 5d478924b439fbe877cdad2fb2fb7f0ba9794ab5 Mon Sep 17 00:00:00 2001 From: Christian Mergenthaler Date: Fri, 10 Mar 2023 12:09:14 +0100 Subject: [PATCH 1/7] Improved observation tags to follow Opentelemetry standard (Listener) --- .../KafkaMessageListenerContainer.java | 2 +- .../micrometer/KafkaListenerObservation.java | 135 +++++++++++++++++- .../KafkaRecordReceiverContext.java | 33 ++++- .../support/micrometer/ObservationTests.java | 99 ++++++++++--- 4 files changed, 246 insertions(+), 23 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 336f13eef0..8dcc133e65 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2687,7 +2687,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( this.containerProperties.getObservationConvention(), DefaultKafkaListenerObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId), + () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(), this::clusterId), this.observationRegistry); return observation.observe(() -> { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index 1bbcfbc582..0d02f67a82 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -21,6 +21,7 @@ import io.micrometer.observation.Observation.Context; import io.micrometer.observation.ObservationConvention; import io.micrometer.observation.docs.ObservationDocumentation; +import org.springframework.util.StringUtils; /** * Spring for Apache Kafka Observation for listeners. @@ -68,6 +69,114 @@ public String asString() { return "spring.kafka.listener.id"; } + }, + + /** + * Messaging system + */ + MESSAGING_SYSTEM { + + @Override + public String asString() { + return "messaging.system"; + } + + }, + + /** + * Messaging operation + */ + MESSAGING_OPERATION { + + @Override + public String asString() { + return "messaging.operation"; + } + + }, + + /** + * Messaging consumer id + */ + MESSAGING_CONSUMER_ID { + + @Override + public String asString() { + return "messaging.consumer.id"; + } + + }, + + /** + * Messaging source name + */ + MESSAGING_SOURCE_NAME { + + @Override + public String asString() { + return "messaging.source.name"; + } + + }, + + /** + * Messaging source kind + */ + MESSAGING_SOURCE_KIND { + + @Override + public String asString() { + return "messaging.source.kind"; + } + + }, + + /** + * Messaging consumer group + */ + MESSAGING_CONSUMER_GROUP { + + @Override + public String asString() { + return "messaging.kafka.consumer.group"; + } + + }, + + /** + * Messaging client id + */ + MESSAGING_CLIENT_ID { + + @Override + public String asString() { + return "messaging.kafka.client_id"; + } + + }, + + /** + * Messaging partition + */ + MESSAGING_PARTITION { + + @Override + public String asString() { + return "messaging.kafka.partition"; + } + + }, + + /** + * Messaging message offset + */ + MESSAGING_OFFSET { + + @Override + public String asString() { + return "messaging.kafka.message.offset"; + } + } } @@ -89,8 +198,23 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) { - return KeyValues.of(KafkaListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(), - context.getListenerId()); + KeyValues keyValues = KeyValues.of( + ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()), + ListenerLowCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context)), + ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"), + ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"), + ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()), + ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"), + ListenerLowCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), + ListenerLowCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()), + ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId()) + ); + + if (StringUtils.hasText(context.getClientId())) { + keyValues = keyValues.and(ListenerLowCardinalityTags.MESSAGING_CLIENT_ID.withValue(context.getClientId())); + } + + return keyValues; } @Override @@ -103,6 +227,13 @@ public String getName() { return "spring.kafka.listener"; } + private String getConsumerId(KafkaRecordReceiverContext context) { + if (StringUtils.hasText(context.getClientId())) { + return context.getGroupId() + " - " + context.getClientId(); + } + return context.getGroupId(); + } + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index b3de789176..fa3f8dc618 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,10 +34,12 @@ public class KafkaRecordReceiverContext extends ReceiverContext> { private final String listenerId; + private final String clientId; + private final String groupId; private final ConsumerRecord record; - public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, Supplier clusterId) { + public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, String clientId, String groupId, Supplier clusterId) { super((carrier, key) -> { Header header = carrier.headers().lastHeader(key); if (header == null || header.value() == null) { @@ -48,6 +50,8 @@ public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId setCarrier(record); this.record = record; this.listenerId = listenerId; + this.clientId = clientId; + this.groupId = groupId; String cluster = clusterId.get(); setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : "")); } @@ -59,6 +63,13 @@ public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId public String getListenerId() { return this.listenerId; } + public String getGroupId() { + return this.groupId; + } + + public String getClientId() { + return clientId; + } /** * Return the source topic. @@ -77,4 +88,22 @@ public String getSource() { return this.record; } + /** + * Return the partition. + * @return the partition. + * @since 3.2 + */ + public String getPartition() { + return Integer.toString(this.record.partition()); + } + + /** + * Return the offset. + * @return the offset. + * @since 3.2 + */ + public String getOffset() { + return Long.toString(this.record.offset()); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 45fa015ac7..839f9d7448 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -128,22 +128,44 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 3); + await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) .containsAllEntriesOf( - Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", "some bar value")); + Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs1-0"), + Map.entry("foo", "some foo value"), + Map.entry("bar", "some bar value"), + Map.entry("messaging.consumer.id", "obs1 - consumer-obs1-2"), + Map.entry("messaging.kafka.client_id", "consumer-obs1-2"), + Map.entry("messaging.kafka.consumer.group", "obs1"), + Map.entry("messaging.kafka.message.offset", "0"), + Map.entry("messaging.kafka.partition", "0"), + Map.entry("messaging.operation", "receive"), + Map.entry("messaging.source.kind", "topic"), + Map.entry("messaging.source.name", "observation.testT1"), + Map.entry("messaging.system", "kafka"))); assertThat(span.getName()).isEqualTo("observation.testT1 receive"); assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: "); await().until(() -> spans.peekFirst().getTags().size() == 1); span = spans.poll(); assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); assertThat(span.getName()).isEqualTo("observation.testT2 send"); - await().until(() -> spans.peekFirst().getTags().size() == 3); + await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) .containsAllEntriesOf( - Map.of("spring.kafka.listener.id", "obs2-0", "foo", "some foo value", "bar", "some bar value")); + Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs2-0"), + Map.entry("foo", "some foo value"), + Map.entry("bar", "some bar value"), + Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-1"), + Map.entry("messaging.kafka.client_id", "consumer-obs2-1"), + Map.entry("messaging.kafka.consumer.group", "obs2"), + Map.entry("messaging.kafka.message.offset", "0"), + Map.entry("messaging.kafka.partition", "0"), + Map.entry("messaging.operation", "receive"), + Map.entry("messaging.source.kind", "topic"), + Map.entry("messaging.source.name", "observation.testT2"), + Map.entry("messaging.system", "kafka"))); assertThat(span.getName()).isEqualTo("observation.testT2 receive"); template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() { @@ -176,7 +198,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); assertThat(span.getTags()).containsEntry("foo", "bar"); assertThat(span.getName()).isEqualTo("observation.testT1 send"); - await().until(() -> spans.peekFirst().getTags().size() == 4); + await().until(() -> spans.peekFirst().getTags().size() == 13); span = spans.poll(); assertThat(span.getTags()) .containsAllEntriesOf(Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", @@ -187,11 +209,22 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); assertThat(span.getTags()).containsEntry("foo", "bar"); assertThat(span.getName()).isEqualTo("observation.testT2 send"); - await().until(() -> spans.peekFirst().getTags().size() == 3); + await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) .containsAllEntriesOf( - Map.of("spring.kafka.listener.id", "obs2-0", "foo", "some foo value", "bar", "some bar value")); + Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs2-0"), + Map.entry("foo", "some foo value"), + Map.entry("bar", "some bar value"), + Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-1"), + Map.entry("messaging.kafka.client_id", "consumer-obs2-1"), + Map.entry("messaging.kafka.consumer.group", "obs2"), + Map.entry("messaging.kafka.message.offset", "1"), + Map.entry("messaging.kafka.partition", "0"), + Map.entry("messaging.operation", "receive"), + Map.entry("messaging.source.kind", "topic"), + Map.entry("messaging.source.name", "observation.testT2"), + Map.entry("messaging.system", "kafka"))); assertThat(span.getTags()).doesNotContainEntry("baz", "qux"); assertThat(span.getName()).isEqualTo("observation.testT2 receive"); MeterRegistryAssert.assertThat(meterRegistry) @@ -199,10 +232,40 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) KeyValues.of("spring.kafka.template.name", "template")) .hasTimerWithNameAndTags("spring.kafka.template", KeyValues.of("spring.kafka.template.name", "template", "foo", "bar")) - .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs1-0")) .hasTimerWithNameAndTags("spring.kafka.listener", - KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux")) - .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0")); + KeyValues.of("spring.kafka.listener.id", "obs1-0", + "messaging.consumer.id", "obs1 - consumer-obs1-2", + "messaging.kafka.client_id", "consumer-obs1-2", + "messaging.kafka.consumer.group", "obs1", + "messaging.kafka.message.offset", "0", + "messaging.kafka.partition", "0", + "messaging.operation", "receive", + "messaging.source.kind", "topic", + "messaging.source.name", "observation.testT1", + "messaging.system", "kafka")) + .hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of("spring.kafka.listener.id", "obs1-0", + "baz", "qux", + "messaging.consumer.id", "obs1 - consumer-obs1-3", + "messaging.kafka.client_id", "consumer-obs1-3", + "messaging.kafka.consumer.group", "obs1", + "messaging.kafka.message.offset", "1", + "messaging.kafka.partition", "0", + "messaging.operation", "receive", + "messaging.source.kind", "topic", + "messaging.source.name", "observation.testT1", + "messaging.system", "kafka")) + .hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of("spring.kafka.listener.id", "obs2-0", + "messaging.consumer.id", "obs2 - consumer-obs2-1", + "messaging.kafka.client_id", "consumer-obs2-1", + "messaging.kafka.consumer.group", "obs2", + "messaging.kafka.message.offset", "0", + "messaging.kafka.partition", "0", + "messaging.operation", "receive", + "messaging.source.kind", "topic", + "messaging.source.name", "observation.testT2", + "messaging.system", "kafka")); assertThat(admin.getConfigurationProperties()) .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); // producer factory broker different to admin @@ -370,14 +433,14 @@ MeterRegistry meterRegistry() { ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) { TestObservationRegistry observationRegistry = TestObservationRegistry.create(); observationRegistry.observationConfig().observationHandler( - // Composite will pick the first matching handler - new ObservationHandler.FirstMatchingCompositeObservationHandler( - // This is responsible for creating a child span on the sender side - new PropagatingSenderTracingObservationHandler<>(tracer, propagator), - // This is responsible for creating a span on the receiver side - new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), - // This is responsible for creating a default span - new DefaultTracingObservationHandler(tracer))) + // Composite will pick the first matching handler + new ObservationHandler.FirstMatchingCompositeObservationHandler( + // This is responsible for creating a child span on the sender side + new PropagatingSenderTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a span on the receiver side + new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a default span + new DefaultTracingObservationHandler(tracer))) .observationHandler(new DefaultMeterObservationHandler(meterRegistry)); return observationRegistry; } From ebce06f806d95ce076d7a4c97b73be6297ed6358 Mon Sep 17 00:00:00 2001 From: Christian Mergenthaler Date: Mon, 13 Mar 2023 08:27:03 +0100 Subject: [PATCH 2/7] Otel conform tags for kafka sender --- .../micrometer/KafkaListenerObservation.java | 2 +- .../micrometer/KafkaRecordSenderContext.java | 1 - .../micrometer/KafkaTemplateObservation.java | 60 ++++++++++++++-- .../support/micrometer/ObservationTests.java | 68 +++++++++++++------ 4 files changed, 105 insertions(+), 26 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index 0d02f67a82..c21ec6a970 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -162,7 +162,7 @@ public String asString() { @Override public String asString() { - return "messaging.kafka.partition"; + return "messaging.kafka.source.partition"; } }, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index ea6005b883..d490a9d68c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -70,5 +70,4 @@ public String getDestination() { public ProducerRecord getRecord() { return this.record; } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java index 60580d86ca..262c86cef8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,6 +69,54 @@ public String asString() { return "spring.kafka.template.name"; } + }, + + /** + * Messaging system + */ + MESSAGING_SYSTEM { + + @Override + public String asString() { + return "messaging.system"; + } + + }, + + /** + * Messaging operation + */ + MESSAGING_OPERATION { + + @Override + public String asString() { + return "messaging.operation"; + } + + }, + + /** + * Messaging destination name + */ + MESSAGING_DESTINATION_NAME { + + @Override + public String asString() { + return "messaging.destination.name"; + } + + }, + + /** + * Messaging destination kind + */ + MESSAGING_DESTINATION_KIND { + + @Override + public String asString() { + return "messaging.destination.kind"; + } + } } @@ -90,13 +138,17 @@ public static class DefaultKafkaTemplateObservationConvention implements KafkaTe @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { - return KeyValues.of(KafkaTemplateObservation.TemplateLowCardinalityTags.BEAN_NAME.asString(), - context.getBeanName()); + return KeyValues.of( + TemplateLowCardinalityTags.BEAN_NAME.withValue(context.getBeanName()), + TemplateLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"), + TemplateLowCardinalityTags.MESSAGING_OPERATION.withValue("publish"), + TemplateLowCardinalityTags.MESSAGING_DESTINATION_KIND.withValue("topic"), + TemplateLowCardinalityTags.MESSAGING_DESTINATION_NAME.withValue(context.getDestination())); } @Override public String getContextualName(KafkaRecordSenderContext context) { - return context.getDestination() + " send"; + return context.getDestination() + " publish"; } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 839f9d7448..22f0d21bda 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -125,8 +125,13 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans = tracer.getSpans(); assertThat(spans).hasSize(4); SimpleSpan span = spans.poll(); - assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); - assertThat(span.getName()).isEqualTo("observation.testT1 send"); + assertThat(span.getTags()) + .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", "observation.testT1")); + assertThat(span.getName()).isEqualTo("observation.testT1 publish"); assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: "); await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); @@ -139,17 +144,22 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 1); + await().until(() -> spans.peekFirst().getTags().size() == 5); span = spans.poll(); - assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); - assertThat(span.getName()).isEqualTo("observation.testT2 send"); + assertThat(span.getTags()) + .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", "observation.testT2")); + assertThat(span.getName()).isEqualTo("observation.testT2 publish"); await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) @@ -161,7 +171,7 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 13); span = spans.poll(); assertThat(span.getTags()) .containsAllEntriesOf(Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", "some bar value", "baz", "qux")); assertThat(span.getName()).isEqualTo("observation.testT1 receive"); - await().until(() -> spans.peekFirst().getTags().size() == 2); + await().until(() -> spans.peekFirst().getTags().size() == 6); span = spans.poll(); - assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); - assertThat(span.getTags()).containsEntry("foo", "bar"); - assertThat(span.getName()).isEqualTo("observation.testT2 send"); + assertThat(span.getTags()) + .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", + "foo", "bar", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", "observation.testT2")); + assertThat(span.getName()).isEqualTo("observation.testT2 publish"); await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) @@ -220,7 +240,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) Map.entry("messaging.kafka.client_id", "consumer-obs2-1"), Map.entry("messaging.kafka.consumer.group", "obs2"), Map.entry("messaging.kafka.message.offset", "1"), - Map.entry("messaging.kafka.partition", "0"), + Map.entry("messaging.kafka.source.partition", "0"), Map.entry("messaging.operation", "receive"), Map.entry("messaging.source.kind", "topic"), Map.entry("messaging.source.name", "observation.testT2"), @@ -229,16 +249,24 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThat(span.getName()).isEqualTo("observation.testT2 receive"); MeterRegistryAssert.assertThat(meterRegistry) .hasTimerWithNameAndTags("spring.kafka.template", - KeyValues.of("spring.kafka.template.name", "template")) + KeyValues.of("spring.kafka.template.name", "template", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", "observation.testT1")) .hasTimerWithNameAndTags("spring.kafka.template", - KeyValues.of("spring.kafka.template.name", "template", "foo", "bar")) + KeyValues.of("spring.kafka.template.name", "template", "foo", "bar", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", "observation.testT2")) .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs1-0", "messaging.consumer.id", "obs1 - consumer-obs1-2", "messaging.kafka.client_id", "consumer-obs1-2", "messaging.kafka.consumer.group", "obs1", "messaging.kafka.message.offset", "0", - "messaging.kafka.partition", "0", + "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT1", @@ -250,7 +278,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) "messaging.kafka.client_id", "consumer-obs1-3", "messaging.kafka.consumer.group", "obs1", "messaging.kafka.message.offset", "1", - "messaging.kafka.partition", "0", + "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT1", @@ -261,7 +289,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) "messaging.kafka.client_id", "consumer-obs2-1", "messaging.kafka.consumer.group", "obs2", "messaging.kafka.message.offset", "0", - "messaging.kafka.partition", "0", + "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT2", From fe4b0f3cd946aec1b85ff433b9bbf567ae79e4d2 Mon Sep 17 00:00:00 2001 From: Christian Mergenthaler Date: Thu, 23 Mar 2023 10:26:02 +0100 Subject: [PATCH 3/7] Moved partition and offset to high-cardinality keys and revert publish -> send --- .../micrometer/KafkaListenerObservation.java | 12 +++++- .../micrometer/KafkaTemplateObservation.java | 2 +- .../support/micrometer/ObservationTests.java | 41 ++++++++----------- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index c21ec6a970..5bd376719b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -205,8 +205,6 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"), ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()), ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"), - ListenerLowCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), - ListenerLowCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()), ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId()) ); @@ -217,6 +215,16 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) return keyValues; } + @Override + public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) { + KeyValues keyValues = KeyValues.of( + ListenerLowCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), + ListenerLowCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()) + ); + + return keyValues; + } + @Override public String getContextualName(KafkaRecordReceiverContext context) { return context.getSource() + " receive"; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java index 262c86cef8..e5f2f96c33 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java @@ -148,7 +148,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { @Override public String getContextualName(KafkaRecordSenderContext context) { - return context.getDestination() + " publish"; + return context.getDestination() + " send"; } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 22f0d21bda..a60807d634 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -92,7 +92,8 @@ */ @SpringJUnitConfig @EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3", - ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR}) + ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR} + , partitions = 1) @DirtiesContext public class ObservationTests { @@ -131,7 +132,7 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 12); span = spans.poll(); @@ -140,8 +141,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) @@ -167,8 +168,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans.peekFirst().getTags().size() == 13); span = spans.poll(); assertThat(span.getTags()) @@ -228,7 +229,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) "messaging.system", "kafka", "messaging.destination.kind", "topic", "messaging.destination.name", "observation.testT2")); - assertThat(span.getName()).isEqualTo("observation.testT2 publish"); + assertThat(span.getName()).isEqualTo("observation.testT2 send"); await().until(() -> spans.peekFirst().getTags().size() == 12); span = spans.poll(); assertThat(span.getTags()) @@ -236,8 +237,8 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs2-0"), Map.entry("foo", "some foo value"), Map.entry("bar", "some bar value"), - Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-1"), - Map.entry("messaging.kafka.client_id", "consumer-obs2-1"), + Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-2"), + Map.entry("messaging.kafka.client_id", "consumer-obs2-2"), Map.entry("messaging.kafka.consumer.group", "obs2"), Map.entry("messaging.kafka.message.offset", "1"), Map.entry("messaging.kafka.source.partition", "0"), @@ -262,11 +263,9 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) "messaging.destination.name", "observation.testT2")) .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs1-0", - "messaging.consumer.id", "obs1 - consumer-obs1-2", - "messaging.kafka.client_id", "consumer-obs1-2", + "messaging.consumer.id", "obs1 - consumer-obs1-3", + "messaging.kafka.client_id", "consumer-obs1-3", "messaging.kafka.consumer.group", "obs1", - "messaging.kafka.message.offset", "0", - "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT1", @@ -274,22 +273,18 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux", - "messaging.consumer.id", "obs1 - consumer-obs1-3", - "messaging.kafka.client_id", "consumer-obs1-3", + "messaging.consumer.id", "obs1 - consumer-obs1-4", + "messaging.kafka.client_id", "consumer-obs1-4", "messaging.kafka.consumer.group", "obs1", - "messaging.kafka.message.offset", "1", - "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT1", "messaging.system", "kafka")) .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0", - "messaging.consumer.id", "obs2 - consumer-obs2-1", - "messaging.kafka.client_id", "consumer-obs2-1", + "messaging.consumer.id", "obs2 - consumer-obs2-2", + "messaging.kafka.client_id", "consumer-obs2-2", "messaging.kafka.consumer.group", "obs2", - "messaging.kafka.message.offset", "0", - "messaging.kafka.source.partition", "0", "messaging.operation", "receive", "messaging.source.kind", "topic", "messaging.source.name", "observation.testT2", From 3c044b00704779c1ff55bd37348ab76318b38dfa Mon Sep 17 00:00:00 2001 From: Christian Mergenthaler Date: Thu, 23 Mar 2023 10:42:10 +0100 Subject: [PATCH 4/7] author and formatting --- .../KafkaMessageListenerContainer.java | 4 +++- .../micrometer/KafkaListenerObservation.java | 21 ++++++++++--------- .../KafkaRecordReceiverContext.java | 7 +++++-- .../micrometer/KafkaRecordSenderContext.java | 1 + .../micrometer/KafkaTemplateObservation.java | 9 ++++---- .../support/micrometer/ObservationTests.java | 1 + 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8dcc133e65..eee9c22ac3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -163,6 +163,7 @@ * @author Soby Chacko * @author Wang Zhiyang * @author Raphael Rösch + * @author Christian Mergenthaler */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -2687,7 +2688,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( this.containerProperties.getObservationConvention(), DefaultKafkaListenerObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(), this::clusterId), + () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(), + this::clusterId), this.observationRegistry); return observation.observe(() -> { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index 5bd376719b..c17871f551 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -16,12 +16,13 @@ package org.springframework.kafka.support.micrometer; +import org.springframework.util.StringUtils; + import io.micrometer.common.KeyValues; import io.micrometer.common.docs.KeyName; import io.micrometer.observation.Observation.Context; import io.micrometer.observation.ObservationConvention; import io.micrometer.observation.docs.ObservationDocumentation; -import org.springframework.util.StringUtils; /** * Spring for Apache Kafka Observation for listeners. @@ -72,7 +73,7 @@ public String asString() { }, /** - * Messaging system + * Messaging system. */ MESSAGING_SYSTEM { @@ -84,7 +85,7 @@ public String asString() { }, /** - * Messaging operation + * Messaging operation. */ MESSAGING_OPERATION { @@ -96,7 +97,7 @@ public String asString() { }, /** - * Messaging consumer id + * Messaging consumer id. */ MESSAGING_CONSUMER_ID { @@ -108,7 +109,7 @@ public String asString() { }, /** - * Messaging source name + * Messaging source name. */ MESSAGING_SOURCE_NAME { @@ -120,7 +121,7 @@ public String asString() { }, /** - * Messaging source kind + * Messaging source kind. */ MESSAGING_SOURCE_KIND { @@ -132,7 +133,7 @@ public String asString() { }, /** - * Messaging consumer group + * Messaging consumer group. */ MESSAGING_CONSUMER_GROUP { @@ -144,7 +145,7 @@ public String asString() { }, /** - * Messaging client id + * Messaging client id. */ MESSAGING_CLIENT_ID { @@ -156,7 +157,7 @@ public String asString() { }, /** - * Messaging partition + * Messaging partition. */ MESSAGING_PARTITION { @@ -168,7 +169,7 @@ public String asString() { }, /** - * Messaging message offset + * Messaging message offset. */ MESSAGING_OFFSET { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index fa3f8dc618..e652f4d617 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -28,6 +28,7 @@ * {@link ReceiverContext} for {@link ConsumerRecord}s. * * @author Gary Russell + * @author Christian Mergenthaler * @since 3.0 * */ @@ -39,7 +40,8 @@ public class KafkaRecordReceiverContext extends ReceiverContext record; - public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, String clientId, String groupId, Supplier clusterId) { + public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, String clientId, String groupId, + Supplier clusterId) { super((carrier, key) -> { Header header = carrier.headers().lastHeader(key); if (header == null || header.value() == null) { @@ -63,12 +65,13 @@ public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId public String getListenerId() { return this.listenerId; } + public String getGroupId() { return this.groupId; } public String getClientId() { - return clientId; + return this.clientId; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index d490a9d68c..8e56a79cdc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -27,6 +27,7 @@ * {@link SenderContext} for {@link ProducerRecord}s. * * @author Gary Russell + * @author Christian Mergenthaler * @since 3.0 * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java index e5f2f96c33..15071ebe76 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java @@ -27,6 +27,7 @@ * {@link org.springframework.kafka.core.KafkaTemplate}. * * @author Gary Russell + * @author Christian Mergenthaler * @since 3.0 * */ @@ -72,7 +73,7 @@ public String asString() { }, /** - * Messaging system + * Messaging system. */ MESSAGING_SYSTEM { @@ -84,7 +85,7 @@ public String asString() { }, /** - * Messaging operation + * Messaging operation. */ MESSAGING_OPERATION { @@ -96,7 +97,7 @@ public String asString() { }, /** - * Messaging destination name + * Messaging destination name. */ MESSAGING_DESTINATION_NAME { @@ -108,7 +109,7 @@ public String asString() { }, /** - * Messaging destination kind + * Messaging destination kind. */ MESSAGING_DESTINATION_KIND { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index a60807d634..4589b1796f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -87,6 +87,7 @@ * @author Gary Russell * @author Artem Bilan * @author Wang Zhiyang + * @author Christian Mergenthaler * * @since 3.0 */ From 1ca8ac8c9ee4d75c36165b77e5f0fe4d87d59214 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Thu, 28 Mar 2024 23:16:51 +0800 Subject: [PATCH 5/7] * fix review. * polish `ObservationTests`. * add @author. * add doc in `whats-new.adoc`. --- .../antora/modules/ROOT/pages/whats-new.adoc | 7 + .../micrometer/KafkaListenerObservation.java | 96 ++++-- .../KafkaRecordReceiverContext.java | 6 +- .../micrometer/KafkaRecordSenderContext.java | 7 +- .../micrometer/KafkaTemplateObservation.java | 22 +- .../support/micrometer/ObservationTests.java | 321 ++++++++---------- 6 files changed, 254 insertions(+), 205 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 96db11cc1c..98196d8bea 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -96,3 +96,10 @@ See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID pref `getListenerContainersMatching(BiPredicate matcher)` to filter by ID and container properties. See xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc#retrieving-message-listener-containers[`@KafkaListener` Lifecycle Management's API Docs] for more information. + +[[x32-observation]] +== Enhanced observation by providing more tracing tags + +`KafkaTemplateObservation` provides more tracing tags(low cardinality). +`KafkaListenerObservation` provides a new API to find high cardinality key names and more tracing tags(high or low cardinality). +See xref:kafka/micrometer.adoc#observation[Micrometer Observation] diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index c17871f551..f95439fcc5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.micrometer; +import org.springframework.lang.NonNull; import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; @@ -28,6 +29,9 @@ * Spring for Apache Kafka Observation for listeners. * * @author Gary Russell + * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ @@ -44,19 +48,32 @@ public Class> getDefaultConve } @Override + @NonNull public String getPrefix() { return "spring.kafka.listener"; } @Override + @NonNull public KeyName[] getLowCardinalityKeyNames() { return ListenerLowCardinalityTags.values(); } + @Override + @NonNull + public KeyName[] getHighCardinalityKeyNames() { + return ListenerHighCardinalityTags.values(); + } + }; /** * Low cardinality tags. + * + * @author Christian Mergenthaler + * @author Wang Zhiyang + * + * @since 3.2 */ public enum ListenerLowCardinalityTags implements KeyName { @@ -66,6 +83,7 @@ public enum ListenerLowCardinalityTags implements KeyName { LISTENER_ID { @Override + @NonNull public String asString() { return "spring.kafka.listener.id"; } @@ -78,6 +96,7 @@ public String asString() { MESSAGING_SYSTEM { @Override + @NonNull public String asString() { return "messaging.system"; } @@ -90,30 +109,20 @@ public String asString() { MESSAGING_OPERATION { @Override + @NonNull public String asString() { return "messaging.operation"; } }, - /** - * Messaging consumer id. - */ - MESSAGING_CONSUMER_ID { - - @Override - public String asString() { - return "messaging.consumer.id"; - } - - }, - /** * Messaging source name. */ MESSAGING_SOURCE_NAME { @Override + @NonNull public String asString() { return "messaging.source.name"; } @@ -126,6 +135,7 @@ public String asString() { MESSAGING_SOURCE_KIND { @Override + @NonNull public String asString() { return "messaging.source.kind"; } @@ -133,35 +143,63 @@ public String asString() { }, /** - * Messaging consumer group. + * Messaging the consumer group. */ MESSAGING_CONSUMER_GROUP { @Override + @NonNull public String asString() { return "messaging.kafka.consumer.group"; } }, + } + + /** + * High cardinality tags. + * + * @author Wang Zhiyang + * @author Christian Mergenthaler + * + * @since 3.2 + */ + public enum ListenerHighCardinalityTags implements KeyName { + /** * Messaging client id. */ MESSAGING_CLIENT_ID { @Override + @NonNull public String asString() { return "messaging.kafka.client_id"; } }, + /** + * Messaging consumer id (consumer group and client id). + */ + MESSAGING_CONSUMER_ID { + + @Override + @NonNull + public String asString() { + return "messaging.consumer.id"; + } + + }, + /** * Messaging partition. */ MESSAGING_PARTITION { @Override + @NonNull public String asString() { return "messaging.kafka.source.partition"; } @@ -174,11 +212,12 @@ public String asString() { MESSAGING_OFFSET { @Override + @NonNull public String asString() { return "messaging.kafka.message.offset"; } - } + }, } @@ -186,6 +225,9 @@ public String asString() { * Default {@link KafkaListenerObservationConvention} for Kafka listener key values. * * @author Gary Russell + * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ @@ -199,30 +241,31 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) { - KeyValues keyValues = KeyValues.of( + + return KeyValues.of( ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()), - ListenerLowCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context)), ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"), ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"), ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()), ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"), ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId()) ); - - if (StringUtils.hasText(context.getClientId())) { - keyValues = keyValues.and(ListenerLowCardinalityTags.MESSAGING_CLIENT_ID.withValue(context.getClientId())); - } - - return keyValues; } @Override + @NonNull public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) { KeyValues keyValues = KeyValues.of( - ListenerLowCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), - ListenerLowCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()) + ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), + ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()) ); + if (StringUtils.hasText(context.getClientId())) { + keyValues = keyValues + .and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(context.getClientId())) + .and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context))); + } + return keyValues; } @@ -231,11 +274,6 @@ public String getContextualName(KafkaRecordReceiverContext context) { return context.getSource() + " receive"; } - @Override - public String getName() { - return "spring.kafka.listener"; - } - private String getConsumerId(KafkaRecordReceiverContext context) { if (StringUtils.hasText(context.getClientId())) { return context.getGroupId() + " - " + context.getClientId(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index e652f4d617..edec627060 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -29,13 +29,17 @@ * * @author Gary Russell * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ public class KafkaRecordReceiverContext extends ReceiverContext> { private final String listenerId; + private final String clientId; + private final String groupId; private final ConsumerRecord record; @@ -84,7 +88,7 @@ public String getSource() { /** * Return the consumer record. - * @return the record the record. + * @return the record. * @since 3.0.6 */ public ConsumerRecord getRecord() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index 8e56a79cdc..615e2db727 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ * * @author Gary Russell * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ @@ -65,10 +67,11 @@ public String getDestination() { /** * Return the producer record. - * @return the record the record. + * @return the record. * @since 3.0.6 */ public ProducerRecord getRecord() { return this.record; } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java index 15071ebe76..c233795e66 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.kafka.support.micrometer; +import org.springframework.lang.NonNull; + import io.micrometer.common.KeyValues; import io.micrometer.common.docs.KeyName; import io.micrometer.observation.Observation.Context; @@ -28,6 +30,8 @@ * * @author Gary Russell * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ @@ -44,11 +48,13 @@ public Class> getDefaultConve } @Override + @NonNull public String getPrefix() { return "spring.kafka.template"; } @Override + @NonNull public KeyName[] getLowCardinalityKeyNames() { return TemplateLowCardinalityTags.values(); } @@ -57,6 +63,11 @@ public KeyName[] getLowCardinalityKeyNames() { /** * Low cardinality tags. + * + * @author Christian Mergenthaler + * @author Wang Zhiyang + * + * @since 3.2 */ public enum TemplateLowCardinalityTags implements KeyName { @@ -66,6 +77,7 @@ public enum TemplateLowCardinalityTags implements KeyName { BEAN_NAME { @Override + @NonNull public String asString() { return "spring.kafka.template.name"; } @@ -78,6 +90,7 @@ public String asString() { MESSAGING_SYSTEM { @Override + @NonNull public String asString() { return "messaging.system"; } @@ -90,6 +103,7 @@ public String asString() { MESSAGING_OPERATION { @Override + @NonNull public String asString() { return "messaging.operation"; } @@ -102,6 +116,7 @@ public String asString() { MESSAGING_DESTINATION_NAME { @Override + @NonNull public String asString() { return "messaging.destination.name"; } @@ -114,6 +129,7 @@ public String asString() { MESSAGING_DESTINATION_KIND { @Override + @NonNull public String asString() { return "messaging.destination.kind"; } @@ -126,6 +142,9 @@ public String asString() { * Default {@link KafkaTemplateObservationConvention} for Kafka template key values. * * @author Gary Russell + * @author Christian Mergenthaler + * @author Wang Zhiyang + * * @since 3.0 * */ @@ -153,6 +172,7 @@ public String getContextualName(KafkaRecordSenderContext context) { } @Override + @NonNull public String getName() { return "spring.kafka.template"; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 4589b1796f..57f8520056 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -56,6 +56,7 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -92,12 +93,18 @@ * @since 3.0 */ @SpringJUnitConfig -@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3", - ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR} - , partitions = 1) +@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2, + ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, + ObservationTests.OBSERVATION_ERROR }, partitions = 1) @DirtiesContext public class ObservationTests { + public final static String OBSERVATION_TEST_1 = "observation.testT1"; + + public final static String OBSERVATION_TEST_2 = "observation.testT2"; + + public final static String OBSERVATION_TEST_3 = "observation.testT3"; + public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception"; public final static String OBSERVATION_ERROR = "observation.error"; @@ -113,72 +120,36 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spanFromCallback = new AtomicReference<>(); - template.send("observation.testT1", "test") + template.send(OBSERVATION_TEST_1, "test") .thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan())) .get(10, TimeUnit.SECONDS); assertThat(spanFromCallback.get()).isNotNull(); + MessageListenerContainer listenerContainer1 = rler.getListenerContainer("obs1"); + MessageListenerContainer listenerContainer2 = rler.getListenerContainer("obs2"); + assertThat(listenerContainer1).isNotNull(); + assertThat(listenerContainer2).isNotNull(); + // consumer factory broker different to admin + assertThatContainerAdmin(listenerContainer1, admin, + broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," + + broker.getBrokersAsString()); + // broker override in annotation + assertThatContainerAdmin(listenerContainer2, admin, broker.getBrokersAsString()); assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + listenerContainer1.stop(); + listenerContainer2.stop(); + assertThat(listener.record).isNotNull(); Headers headers = listener.record.headers(); assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); Deque spans = tracer.getSpans(); assertThat(spans).hasSize(4); - SimpleSpan span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT1")); - assertThat(span.getName()).isEqualTo("observation.testT1 send"); - assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: "); - await().until(() -> spans.peekFirst().getTags().size() == 12); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf( - Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs1-0"), - Map.entry("foo", "some foo value"), - Map.entry("bar", "some bar value"), - Map.entry("messaging.consumer.id", "obs1 - consumer-obs1-3"), - Map.entry("messaging.kafka.client_id", "consumer-obs1-3"), - Map.entry("messaging.kafka.consumer.group", "obs1"), - Map.entry("messaging.kafka.message.offset", "0"), - Map.entry("messaging.kafka.source.partition", "0"), - Map.entry("messaging.operation", "receive"), - Map.entry("messaging.source.kind", "topic"), - Map.entry("messaging.source.name", "observation.testT1"), - Map.entry("messaging.system", "kafka"))); - assertThat(span.getName()).isEqualTo("observation.testT1 receive"); - assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: "); - await().until(() -> spans.peekFirst().getTags().size() == 5); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT2")); - assertThat(span.getName()).isEqualTo("observation.testT2 send"); - await().until(() -> spans.peekFirst().getTags().size() == 12); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf( - Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs2-0"), - Map.entry("foo", "some foo value"), - Map.entry("bar", "some bar value"), - Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-2"), - Map.entry("messaging.kafka.client_id", "consumer-obs2-2"), - Map.entry("messaging.kafka.consumer.group", "obs2"), - Map.entry("messaging.kafka.message.offset", "0"), - Map.entry("messaging.kafka.source.partition", "0"), - Map.entry("messaging.operation", "receive"), - Map.entry("messaging.source.kind", "topic"), - Map.entry("messaging.source.name", "observation.testT2"), - Map.entry("messaging.system", "kafka"))); - assertThat(span.getName()).isEqualTo("observation.testT2 receive"); + assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_1); + assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_1, "obs1-0", "obs1", "0", "0"); + assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_2); + assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "0", "0"); template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() { @Override @@ -187,7 +158,9 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { } }); - rler.getListenerContainer("obs1").getContainerProperties().setObservationConvention( + template.send(OBSERVATION_TEST_1, "test").get(10, TimeUnit.SECONDS); + + listenerContainer1.getContainerProperties().setObservationConvention( new DefaultKafkaListenerObservationConvention() { @Override @@ -197,130 +170,43 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) }); - rler.getListenerContainer("obs1").stop(); - rler.getListenerContainer("obs1").start(); - template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); + listenerContainer2.start(); + listenerContainer1.start(); assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + listenerContainer1.stop(); + listenerContainer2.stop(); + assertThat(listener.record).isNotNull(); headers = listener.record.headers(); assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", - "foo", "bar", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT1")); - assertThat(span.getName()).isEqualTo("observation.testT1 send"); - await().until(() -> spans.peekFirst().getTags().size() == 13); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf(Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", - "some bar value", "baz", "qux")); - assertThat(span.getName()).isEqualTo("observation.testT1 receive"); - await().until(() -> spans.peekFirst().getTags().size() == 6); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf(Map.of("spring.kafka.template.name", "template", - "foo", "bar", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT2")); - assertThat(span.getName()).isEqualTo("observation.testT2 send"); - await().until(() -> spans.peekFirst().getTags().size() == 12); - span = spans.poll(); - assertThat(span.getTags()) - .containsAllEntriesOf( - Map.ofEntries(Map.entry("spring.kafka.listener.id", "obs2-0"), - Map.entry("foo", "some foo value"), - Map.entry("bar", "some bar value"), - Map.entry("messaging.consumer.id", "obs2 - consumer-obs2-2"), - Map.entry("messaging.kafka.client_id", "consumer-obs2-2"), - Map.entry("messaging.kafka.consumer.group", "obs2"), - Map.entry("messaging.kafka.message.offset", "1"), - Map.entry("messaging.kafka.source.partition", "0"), - Map.entry("messaging.operation", "receive"), - Map.entry("messaging.source.kind", "topic"), - Map.entry("messaging.source.name", "observation.testT2"), - Map.entry("messaging.system", "kafka"))); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1, Map.entry("foo", "bar")); + assertThatListenerSpanTags(spans, 13, OBSERVATION_TEST_1, "obs1-0", "obs1", "1", "0", Map.entry("baz", "qux")); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2, Map.entry("foo", "bar")); + SimpleSpan span = assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "1", "0"); assertThat(span.getTags()).doesNotContainEntry("baz", "qux"); - assertThat(span.getName()).isEqualTo("observation.testT2 receive"); - MeterRegistryAssert.assertThat(meterRegistry) - .hasTimerWithNameAndTags("spring.kafka.template", - KeyValues.of("spring.kafka.template.name", "template", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT1")) - .hasTimerWithNameAndTags("spring.kafka.template", - KeyValues.of("spring.kafka.template.name", "template", "foo", "bar", - "messaging.operation", "publish", - "messaging.system", "kafka", - "messaging.destination.kind", "topic", - "messaging.destination.name", "observation.testT2")) - .hasTimerWithNameAndTags("spring.kafka.listener", - KeyValues.of("spring.kafka.listener.id", "obs1-0", - "messaging.consumer.id", "obs1 - consumer-obs1-3", - "messaging.kafka.client_id", "consumer-obs1-3", - "messaging.kafka.consumer.group", "obs1", - "messaging.operation", "receive", - "messaging.source.kind", "topic", - "messaging.source.name", "observation.testT1", - "messaging.system", "kafka")) - .hasTimerWithNameAndTags("spring.kafka.listener", - KeyValues.of("spring.kafka.listener.id", "obs1-0", - "baz", "qux", - "messaging.consumer.id", "obs1 - consumer-obs1-4", - "messaging.kafka.client_id", "consumer-obs1-4", - "messaging.kafka.consumer.group", "obs1", - "messaging.operation", "receive", - "messaging.source.kind", "topic", - "messaging.source.name", "observation.testT1", - "messaging.system", "kafka")) - .hasTimerWithNameAndTags("spring.kafka.listener", - KeyValues.of("spring.kafka.listener.id", "obs2-0", - "messaging.consumer.id", "obs2 - consumer-obs2-2", - "messaging.kafka.client_id", "consumer-obs2-2", - "messaging.kafka.consumer.group", "obs2", - "messaging.operation", "receive", - "messaging.source.kind", "topic", - "messaging.source.name", "observation.testT2", - "messaging.system", "kafka")); + MeterRegistryAssert meterRegistryAssert = MeterRegistryAssert.assertThat(meterRegistry); + assertThatTemplateHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_1); + assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_1, "obs1", "obs1-0"); + assertThatTemplateHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "foo", "bar"); + assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_1, "obs1", "obs1-0", + "baz", "qux"); + assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0"); + assertThat(admin.getConfigurationProperties()) .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); // producer factory broker different to admin - KafkaAdmin pAdmin = KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class); - assertThat(pAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout()); - assertThat(pAdmin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, - broker.getBrokersAsString() + "," + broker.getBrokersAsString()); + assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(), + "kafkaAdmin"); // custom admin assertThat(customTemplate.getKafkaAdmin()).isSameAs(config.mockAdmin); - // consumer factory broker different to admin - Object container = KafkaTestUtils - .getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0); - KafkaAdmin cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); - assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout()); - assertThat(cAdmin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, - broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," - + broker.getBrokersAsString()); - // broker override in annotation - container = KafkaTestUtils - .getPropertyValue(endpointRegistry.getListenerContainer("obs2"), "containers", List.class).get(0); - cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); - assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout()); - assertThat(cAdmin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); // custom admin - container = KafkaTestUtils - .getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0); - cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); + Object container = KafkaTestUtils.getPropertyValue( + endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0); + KafkaAdmin cAdmin = KafkaTestUtils.getPropertyValue( + container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); assertThat(cAdmin).isSameAs(config.mockAdmin); assertThatExceptionOfType(KafkaException.class) @@ -332,6 +218,97 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } + @SafeVarargs + @SuppressWarnings("varargs") + private void assertThatTemplateSpanTags(Deque spans, int tagSize, String destName, + Map.Entry... keyValues) { + + SimpleSpan span = spans.poll(); + assertThat(span).isNotNull(); + await().until(() -> span.getTags().size() == tagSize); + assertThat(span.getTags()).containsAllEntriesOf(Map.of( + "spring.kafka.template.name", "template", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", destName)); + if (keyValues != null && keyValues.length > 0) { + Arrays.stream(keyValues).forEach(entry -> assertThat(span.getTags()).contains(entry)); + } + assertThat(span.getName()).isEqualTo(destName + " send"); + assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: "); + } + + @SafeVarargs + @SuppressWarnings("varargs") + private SimpleSpan assertThatListenerSpanTags(Deque spans, int tagSize, String sourceName, + String listenerId, String consumerGroup, String offset, String partition, + Map.Entry... keyValues) { + + SimpleSpan span = spans.poll(); + assertThat(span).isNotNull(); + await().until(() -> span.getTags().size() == tagSize); + String clientId = span.getTags().get("messaging.kafka.client_id"); + assertThat(span.getTags()) + .containsAllEntriesOf( + Map.ofEntries(Map.entry("spring.kafka.listener.id", listenerId), + Map.entry("foo", "some foo value"), + Map.entry("bar", "some bar value"), + Map.entry("messaging.consumer.id", consumerGroup + " - " + clientId), + Map.entry("messaging.kafka.consumer.group", consumerGroup), + Map.entry("messaging.kafka.message.offset", offset), + Map.entry("messaging.kafka.source.partition", partition), + Map.entry("messaging.operation", "receive"), + Map.entry("messaging.source.kind", "topic"), + Map.entry("messaging.source.name", sourceName), + Map.entry("messaging.system", "kafka"))); + if (keyValues != null && keyValues.length > 0) { + Arrays.stream(keyValues).forEach(entry -> assertThat(span.getTags()).contains(entry)); + } + assertThat(span.getName()).isEqualTo(sourceName + " receive"); + return span; + } + + private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meterRegistryAssert, String destName, + String... keyValues) { + + meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.template", + KeyValues.of("spring.kafka.template.name", "template", + "messaging.operation", "publish", + "messaging.system", "kafka", + "messaging.destination.kind", "topic", + "messaging.destination.name", destName) + .and(keyValues)); + } + + private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meterRegistryAssert, String destName, + String consumerGroup, String listenerId, String... keyValues) { + + meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of( + "messaging.kafka.consumer.group", consumerGroup, + "messaging.operation", "receive", + "messaging.source.kind", "topic", + "messaging.source.name", destName, + "messaging.system", "kafka", + "spring.kafka.listener.id", listenerId) + .and(keyValues)); + } + + private void assertThatContainerAdmin(MessageListenerContainer listenerContainer, KafkaAdmin admin, + String brokersString) { + + Object container = KafkaTestUtils.getPropertyValue(listenerContainer, "containers", List.class).get(0); + assertThatAdmin(container, admin, brokersString, "listenerConsumer.kafkaAdmin"); + } + + private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersString, String key) { + KafkaAdmin cAdmin = KafkaTestUtils.getPropertyValue(object, key, KafkaAdmin.class); + assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout()); + assertThat(cAdmin.getConfigurationProperties()) + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString); + } + @Test void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, @@ -524,12 +501,12 @@ public Listener(KafkaTemplate template) { this.template = template; } - @KafkaListener(id = "obs1", topics = "observation.testT1") + @KafkaListener(id = "obs1", topics = OBSERVATION_TEST_1) void listen1(ConsumerRecord in) { - this.template.send("observation.testT2", in.value()); + this.template.send(OBSERVATION_TEST_2, in.value()); } - @KafkaListener(id = "obs2", topics = "observation.testT2", + @KafkaListener(id = "obs2", topics = OBSERVATION_TEST_2, properties = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":" + "#{@embeddedKafka.brokersAsString}") void listen2(ConsumerRecord in) { this.record = in; @@ -537,7 +514,7 @@ void listen2(ConsumerRecord in) { this.latch2.countDown(); } - @KafkaListener(id = "obs3", topics = "observation.testT3") + @KafkaListener(id = "obs3", topics = OBSERVATION_TEST_3) void listen3(ConsumerRecord in) { } From 2982214d4f77d3d049889d7eb7f89d71f4de6dcd Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Tue, 2 Apr 2024 10:23:47 +0800 Subject: [PATCH 6/7] * fix review. * add javadoc for `KafkaRecordReceiverContext` construct method. --- .../micrometer/KafkaListenerObservation.java | 16 +++++++++------- .../KafkaRecordReceiverContext.java | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index f95439fcc5..83adb424f0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -17,6 +17,7 @@ package org.springframework.kafka.support.micrometer; import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; @@ -255,15 +256,16 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) @Override @NonNull public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) { + String clientId = context.getClientId(); KeyValues keyValues = KeyValues.of( ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()), - ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()) + ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()), + ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context, clientId)) ); - if (StringUtils.hasText(context.getClientId())) { + if (StringUtils.hasText(clientId)) { keyValues = keyValues - .and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(context.getClientId())) - .and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context))); + .and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(clientId)); } return keyValues; @@ -274,9 +276,9 @@ public String getContextualName(KafkaRecordReceiverContext context) { return context.getSource() + " receive"; } - private String getConsumerId(KafkaRecordReceiverContext context) { - if (StringUtils.hasText(context.getClientId())) { - return context.getGroupId() + " - " + context.getClientId(); + private String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) { + if (StringUtils.hasText(clientId)) { + return context.getGroupId() + " - " + clientId; } return context.getGroupId(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index edec627060..198dae2ee1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -44,6 +44,25 @@ public class KafkaRecordReceiverContext extends ReceiverContext record; + /** + * Construct a kafka record receiver context. + * @param record the consumer record. + * @param listenerId the container listener id. + * @param clusterId the kafka cluster id. + */ + public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, Supplier clusterId) { + this(record, listenerId, null, null, clusterId); + } + + /** + * Construct a kafka record receiver context. + * @param record the consumer record. + * @param listenerId the container listener id. + * @param clientId the kafka client id. + * @param groupId the consumer group id. + * @param clusterId the kafka cluster id. + * @since 3.2 + */ public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId, String clientId, String groupId, Supplier clusterId) { super((carrier, key) -> { From a382e45656a7b5fddd49deb925a8388e3d7ad6e2 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Wed, 3 Apr 2024 10:49:31 +0800 Subject: [PATCH 7/7] * fix review. --- .../micrometer/KafkaListenerObservation.java | 23 +++++-------------- .../KafkaRecordReceiverContext.java | 13 +++++++++++ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java index 83adb424f0..571c2ab253 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -70,11 +70,6 @@ public KeyName[] getHighCardinalityKeyNames() { /** * Low cardinality tags. - * - * @author Christian Mergenthaler - * @author Wang Zhiyang - * - * @since 3.2 */ public enum ListenerLowCardinalityTags implements KeyName { @@ -93,6 +88,7 @@ public String asString() { /** * Messaging system. + * @since 3.2 */ MESSAGING_SYSTEM { @@ -106,6 +102,7 @@ public String asString() { /** * Messaging operation. + * @since 3.2 */ MESSAGING_OPERATION { @@ -119,6 +116,7 @@ public String asString() { /** * Messaging source name. + * @since 3.2 */ MESSAGING_SOURCE_NAME { @@ -132,6 +130,7 @@ public String asString() { /** * Messaging source kind. + * @since 3.2 */ MESSAGING_SOURCE_KIND { @@ -145,6 +144,7 @@ public String asString() { /** * Messaging the consumer group. + * @since 3.2 */ MESSAGING_CONSUMER_GROUP { @@ -160,10 +160,6 @@ public String asString() { /** * High cardinality tags. - * - * @author Wang Zhiyang - * @author Christian Mergenthaler - * * @since 3.2 */ public enum ListenerHighCardinalityTags implements KeyName { @@ -224,13 +220,6 @@ public String asString() { /** * Default {@link KafkaListenerObservationConvention} for Kafka listener key values. - * - * @author Gary Russell - * @author Christian Mergenthaler - * @author Wang Zhiyang - * - * @since 3.0 - * */ public static class DefaultKafkaListenerObservationConvention implements KafkaListenerObservationConvention { @@ -276,7 +265,7 @@ public String getContextualName(KafkaRecordReceiverContext context) { return context.getSource() + " receive"; } - private String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) { + private static String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) { if (StringUtils.hasText(clientId)) { return context.getGroupId() + " - " + clientId; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index 198dae2ee1..847fd6ba7e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; +import org.springframework.lang.Nullable; + import io.micrometer.observation.transport.ReceiverContext; /** @@ -89,10 +91,21 @@ public String getListenerId() { return this.listenerId; } + /** + * Return the consumer group id. + * @return the consumer group id. + * @since 3.2 + */ public String getGroupId() { return this.groupId; } + /** + * Return the client id. + * @return the client id. + * @since 3.2 + */ + @Nullable public String getClientId() { return this.clientId; }