Skip to content

Commit

Permalink
GH-3210: KafkaTemplate currentSpan tagging issue (#3211)
Browse files Browse the repository at this point in the history
Fixes: #3210

* When adding a tag to the current span during the sending of a kafka message
  using KafkaTemplate, the tag gets added to another span because KafkaTemplate
  doesn't open the scope for the started observation. Fixing this issue by wrapping
  the doSend method call in a proper observation scope.

**Auto-cherry-pick to `3.1.x` & `3.0.x`**
  • Loading branch information
sobychacko authored Apr 23, 2024
1 parent 96b2791 commit 48e018d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
this.observationRegistry);
try {
observation.start();
return doSend(producerRecord, observation);
try (Observation.Scope ignored = observation.openScope()) {
return doSend(producerRecord, observation);
}
}
catch (RuntimeException ex) {
// The error is added from org.apache.kafka.clients.producer.Callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -89,6 +92,7 @@
* @author Artem Bilan
* @author Wang Zhiyang
* @author Christian Mergenthaler
* @author Soby Chacko
*
* @since 3.0
*/
Expand Down Expand Up @@ -120,10 +124,41 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St

AtomicReference<SimpleSpan> spanFromCallback = new AtomicReference<>();

template.setProducerInterceptor(new ProducerInterceptor<>() {
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
tracer.currentSpanCustomizer().tag("key", "value");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
});

template.send(OBSERVATION_TEST_1, "test")
.thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan()))
.get(10, TimeUnit.SECONDS);

Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(1);

SimpleSpan templateSpan = spans.peek();
assertThat(templateSpan).isNotNull();
assertThat(templateSpan.getTags()).containsAllEntriesOf(Map.of(
"key", "value"));

assertThat(spanFromCallback.get()).isNotNull();
MessageListenerContainer listenerContainer1 = rler.getListenerContainer("obs1");
MessageListenerContainer listenerContainer2 = rler.getListenerContainer("obs2");
Expand All @@ -144,11 +179,11 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
Headers headers = listener.record.headers();
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
Deque<SimpleSpan> spans = tracer.getSpans();
spans = tracer.getSpans();
assertThat(spans).hasSize(4);
assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_1);
assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1);
assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_1, "obs1-0", "obs1", "0", "0");
assertThatTemplateSpanTags(spans, 5, OBSERVATION_TEST_2);
assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2);
assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "0", "0");
template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() {

Expand Down Expand Up @@ -181,9 +216,9 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
assertThat(spans).hasSize(4);
assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_1, Map.entry("foo", "bar"));
assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_1, Map.entry("foo", "bar"));
assertThatListenerSpanTags(spans, 13, OBSERVATION_TEST_1, "obs1-0", "obs1", "1", "0", Map.entry("baz", "qux"));
assertThatTemplateSpanTags(spans, 6, OBSERVATION_TEST_2, Map.entry("foo", "bar"));
assertThatTemplateSpanTags(spans, 7, OBSERVATION_TEST_2, Map.entry("foo", "bar"));
SimpleSpan span = assertThatListenerSpanTags(spans, 12, OBSERVATION_TEST_2, "obs2-0", "obs2", "1", "0");
assertThat(span.getTags()).doesNotContainEntry("baz", "qux");
MeterRegistryAssert meterRegistryAssert = MeterRegistryAssert.assertThat(meterRegistry);
Expand Down

0 comments on commit 48e018d

Please sign in to comment.