diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 0b16ead2f5..4ce5f4c236 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -768,7 +768,9 @@ private CompletableFuture> observeSend(final ProducerRecord spanFromCallback = new AtomicReference<>(); + template.setProducerInterceptor(new ProducerInterceptor<>() { + @Override + public ProducerRecord onSend(ProducerRecord record) { + tracer.currentSpanCustomizer().tag("key", "value"); + return record; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + }); + template.send(OBSERVATION_TEST_1, "test") .thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan())) .get(10, TimeUnit.SECONDS); + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(1); + + SimpleSpan templateSpan = spans.peek(); + assertThat(templateSpan).isNotNull(); + assertThat(templateSpan.getTags()).containsAllEntriesOf(Map.of( + "key", "value")); + assertThat(spanFromCallback.get()).isNotNull(); MessageListenerContainer listenerContainer1 = rler.getListenerContainer("obs1"); MessageListenerContainer listenerContainer2 = rler.getListenerContainer("obs2"); @@ -144,11 +179,11 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spans = tracer.getSpans(); + spans = tracer.getSpans(); assertThat(spans).hasSize(4); - assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_1); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1); assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_1, "obs1-0", "obs1", "0", "0"); - assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_2); + assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2); assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "0", "0"); template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() { @@ -181,9 +216,9 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); - assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1, Map.entry("foo", "bar")); + assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_1, Map.entry("foo", "bar")); assertThatListenerSpanTags(spans, 13, OBSERVATION_TEST_1, "obs1-0", "obs1", "1", "0", Map.entry("baz", "qux")); - assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2, Map.entry("foo", "bar")); + assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_2, Map.entry("foo", "bar")); SimpleSpan span = assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "1", "0"); assertThat(span.getTags()).doesNotContainEntry("baz", "qux"); MeterRegistryAssert meterRegistryAssert = MeterRegistryAssert.assertThat(meterRegistry);