From 48e018dee30a09be38a597ef0c85baa173c86183 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 23 Apr 2024 13:22:00 -0400 Subject: [PATCH] GH-3210: KafkaTemplate currentSpan tagging issue (#3211) Fixes: #3210 * When adding a tag to the current span during the sending of a kafka message using KafkaTemplate, the tag gets added to another span because KafkaTemplate doesn't open the scope for the started observation. Fixing this issue by wrapping the doSend method call in a proper observation scope. **Auto-cherry-pick to `3.1.x` & `3.0.x`** --- .../kafka/core/KafkaTemplate.java | 4 +- .../support/micrometer/ObservationTests.java | 45 ++++++++++++++++--- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 0b16ead2f5..4ce5f4c236 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -768,7 +768,9 @@ private CompletableFuture> observeSend(final ProducerRecord spanFromCallback = new AtomicReference<>(); + template.setProducerInterceptor(new ProducerInterceptor<>() { + @Override + public ProducerRecord onSend(ProducerRecord record) { + tracer.currentSpanCustomizer().tag("key", "value"); + return record; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + }); + template.send(OBSERVATION_TEST_1, "test") .thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan())) .get(10, TimeUnit.SECONDS); + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(1); + + SimpleSpan templateSpan = spans.peek(); + assertThat(templateSpan).isNotNull(); + assertThat(templateSpan.getTags()).containsAllEntriesOf(Map.of( + "key", "value")); + assertThat(spanFromCallback.get()).isNotNull(); MessageListenerContainer listenerContainer1 = rler.getListenerContainer("obs1"); MessageListenerContainer listenerContainer2 = rler.getListenerContainer("obs2"); @@ -144,11 +179,11 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans = tracer.getSpans(); + spans = tracer.getSpans(); assertThat(spans).hasSize(4); - assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_1); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1); assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_1, "obs1-0", "obs1", "0", "0"); - assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_2); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2); assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "0", "0"); template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() { @@ -181,9 +216,9 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); - assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1, Map.entry("foo", "bar")); + assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_1, Map.entry("foo", "bar")); assertThatListenerSpanTags(spans, 13, OBSERVATION_TEST_1, "obs1-0", "obs1", "1", "0", Map.entry("baz", "qux")); - assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2, Map.entry("foo", "bar")); + assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_2, Map.entry("foo", "bar")); SimpleSpan span = assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "1", "0"); assertThat(span.getTags()).doesNotContainEntry("baz", "qux"); MeterRegistryAssert meterRegistryAssert = MeterRegistryAssert.assertThat(meterRegistry);