Skip to content

Commit 92ec06e

Browse files
committed
refactor: don't use deprecated SpanAttributes in kafka
1 parent 4a1e0ce commit 92ec06e

File tree

2 files changed

+51
-58
lines changed

2 files changed

+51
-58
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
from opentelemetry import context, propagate
55
from opentelemetry.propagators import textmap
6-
from opentelemetry.semconv.trace import (
7-
MessagingDestinationKindValues,
8-
MessagingOperationValues,
9-
SpanAttributes,
6+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
7+
MESSAGING_DESTINATION_NAME,
8+
MESSAGING_DESTINATION_TEMPORARY,
9+
MESSAGING_KAFKA_DESTINATION_PARTITION,
10+
MESSAGING_MESSAGE_ID,
11+
MESSAGING_OPERATION,
12+
MESSAGING_SYSTEM,
13+
MessagingOperationTypeValues,
1014
)
1115
from opentelemetry.trace import Link, SpanKind
1216

@@ -114,32 +118,25 @@ def _enrich_span(
114118
topic,
115119
partition: Optional[int] = None,
116120
offset: Optional[int] = None,
117-
operation: Optional[MessagingOperationValues] = None,
121+
operation: Optional[MessagingOperationTypeValues] = None,
118122
):
119123
if not span.is_recording():
120124
return
121125

122-
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
123-
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
124-
126+
span.set_attribute(MESSAGING_SYSTEM, "kafka")
127+
span.set_attribute(MESSAGING_DESTINATION_NAME, topic)
125128
if partition is not None:
126-
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
127-
128-
span.set_attribute(
129-
SpanAttributes.MESSAGING_DESTINATION_KIND,
130-
MessagingDestinationKindValues.QUEUE.value,
131-
)
132-
129+
span.set_attribute(MESSAGING_KAFKA_DESTINATION_PARTITION, partition)
133130
if operation:
134-
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value)
131+
span.set_attribute(MESSAGING_OPERATION, operation.value)
135132
else:
136-
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
133+
span.set_attribute(MESSAGING_DESTINATION_TEMPORARY, True)
137134

138135
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
139136
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
140137
if partition is not None and offset is not None and topic:
141138
span.set_attribute(
142-
SpanAttributes.MESSAGING_MESSAGE_ID,
139+
MESSAGING_MESSAGE_ID,
143140
f"{topic}.{partition}.{offset}",
144141
)
145142

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
KafkaContextGetter,
2626
KafkaContextSetter,
2727
)
28-
from opentelemetry.semconv.trace import (
29-
MessagingDestinationKindValues,
30-
SpanAttributes,
28+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
29+
MESSAGING_DESTINATION_NAME,
30+
MESSAGING_KAFKA_DESTINATION_PARTITION,
31+
MESSAGING_MESSAGE_ID,
32+
MESSAGING_OPERATION,
33+
MESSAGING_SYSTEM,
3134
)
3235
from opentelemetry.test.test_base import TestBase
3336

@@ -122,36 +125,33 @@ def test_poll(self) -> None:
122125
{
123126
"name": "topic-10 process",
124127
"attributes": {
125-
SpanAttributes.MESSAGING_OPERATION: "process",
126-
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
127-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
128-
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
129-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
130-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
128+
MESSAGING_OPERATION: "process",
129+
MESSAGING_KAFKA_DESTINATION_PARTITION: 0,
130+
MESSAGING_SYSTEM: "kafka",
131+
MESSAGING_DESTINATION_NAME: "topic-10",
132+
MESSAGING_MESSAGE_ID: "topic-10.0.0",
131133
},
132134
},
133135
{"name": "recv", "attributes": {}},
134136
{
135137
"name": "topic-20 process",
136138
"attributes": {
137-
SpanAttributes.MESSAGING_OPERATION: "process",
138-
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
139-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
140-
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
141-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
142-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
139+
MESSAGING_OPERATION: "process",
140+
MESSAGING_KAFKA_DESTINATION_PARTITION: 2,
141+
MESSAGING_SYSTEM: "kafka",
142+
MESSAGING_DESTINATION_NAME: "topic-20",
143+
MESSAGING_MESSAGE_ID: "topic-20.2.4",
143144
},
144145
},
145146
{"name": "recv", "attributes": {}},
146147
{
147148
"name": "topic-30 process",
148149
"attributes": {
149-
SpanAttributes.MESSAGING_OPERATION: "process",
150-
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
151-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
152-
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
153-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
154-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
150+
MESSAGING_OPERATION: "process",
151+
MESSAGING_KAFKA_DESTINATION_PARTITION: 1,
152+
MESSAGING_SYSTEM: "kafka",
153+
MESSAGING_DESTINATION_NAME: "topic-30",
154+
MESSAGING_MESSAGE_ID: "topic-30.1.3",
155155
},
156156
},
157157
{"name": "recv", "attributes": {}},
@@ -190,30 +190,27 @@ def test_consume(self) -> None:
190190
{
191191
"name": "topic-1 process",
192192
"attributes": {
193-
SpanAttributes.MESSAGING_OPERATION: "process",
194-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
195-
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
196-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
193+
MESSAGING_OPERATION: "process",
194+
MESSAGING_SYSTEM: "kafka",
195+
MESSAGING_DESTINATION_NAME: "topic-1",
197196
},
198197
},
199198
{"name": "recv", "attributes": {}},
200199
{
201200
"name": "topic-2 process",
202201
"attributes": {
203-
SpanAttributes.MESSAGING_OPERATION: "process",
204-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
205-
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
206-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
202+
MESSAGING_OPERATION: "process",
203+
MESSAGING_SYSTEM: "kafka",
204+
MESSAGING_DESTINATION_NAME: "topic-2",
207205
},
208206
},
209207
{"name": "recv", "attributes": {}},
210208
{
211209
"name": "topic-3 process",
212210
"attributes": {
213-
SpanAttributes.MESSAGING_OPERATION: "process",
214-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
215-
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
216-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
211+
MESSAGING_OPERATION: "process",
212+
MESSAGING_SYSTEM: "kafka",
213+
MESSAGING_DESTINATION_NAME: "topic-3",
217214
},
218215
},
219216
{"name": "recv", "attributes": {}},
@@ -247,12 +244,11 @@ def test_close(self) -> None:
247244
{
248245
"name": "topic-a process",
249246
"attributes": {
250-
SpanAttributes.MESSAGING_OPERATION: "process",
251-
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
252-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
253-
SpanAttributes.MESSAGING_DESTINATION: "topic-a",
254-
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
255-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0",
247+
MESSAGING_OPERATION: "process",
248+
MESSAGING_KAFKA_DESTINATION_PARTITION: 0,
249+
MESSAGING_SYSTEM: "kafka",
250+
MESSAGING_DESTINATION_NAME: "topic-a",
251+
MESSAGING_MESSAGE_ID: "topic-a.0.0",
256252
},
257253
},
258254
]
@@ -286,7 +282,7 @@ def _compare_spans(self, spans, expected_spans):
286282

287283
def _assert_topic(self, span, expected_topic: str) -> None:
288284
self.assertEqual(
289-
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
285+
span.attributes[MESSAGING_DESTINATION_NAME],
290286
expected_topic,
291287
)
292288

0 commit comments

Comments
 (0)