Skip to content

[ServiceBus] Add support for core tracing #41257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 122 additions & 53 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from enum import Enum
import logging
from typing import (
Any,
Dict,
Iterable,
Iterator,
Expand All @@ -22,6 +23,19 @@
from azure.core import CaseInsensitiveEnumMeta
from azure.core.settings import settings
from azure.core.tracing import SpanKind, Link
from azure.servicebus._version import VERSION

try:
from azure.core.instrumentation import get_tracer
TRACER = get_tracer(
library_name="azure-servicebus",
library_version=VERSION,
attributes={
"az.namespace": "Microsoft.ServiceBus",
},
)
except ImportError:
TRACER = None

if TYPE_CHECKING:
try:
Expand Down Expand Up @@ -62,6 +76,7 @@
_LOGGER = logging.getLogger(__name__)



class TraceAttributes:
TRACE_NAMESPACE_ATTRIBUTE = "az.namespace"
TRACE_NAMESPACE = "Microsoft.ServiceBus"
Expand All @@ -74,9 +89,6 @@ class TraceAttributes:
TRACE_MESSAGING_OPERATION_ATTRIBUTE = "messaging.operation"
TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE = "messaging.batch.message_count"

LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE = "message_bus.destination"
LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE = "peer.address"


class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta):
PUBLISH = "publish"
Expand All @@ -85,8 +97,13 @@ class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta):


def is_tracing_enabled():
span_impl_type = settings.tracing_implementation()
return span_impl_type is not None
if TRACER is None:
# The version of azure-core installed does not support native tracing. Just check
# for the plugin.
span_impl_type = settings.tracing_implementation()
return span_impl_type is not None
# Otherwise, can just check the tracing setting.
return settings.tracing_enabled()


@contextmanager
Expand All @@ -96,6 +113,7 @@ def send_trace_context_manager(
links: Optional[List[Link]] = None,
) -> Iterator[None]:
"""Tracing for sending messages.

:param sender: The sender that is sending the message.
:type sender: ~azure.servicebus.ServiceBusSender or ~azure.servicebus.aio.ServiceBusSenderAsync
:param span_name: The name of the tracing span.
Expand All @@ -106,11 +124,19 @@ def send_trace_context_manager(
:rtype: iterator
"""
span_impl_type: Optional[Type[AbstractSpan]] = settings.tracing_implementation()
links = links or []

if span_impl_type is not None:
links = links or []
with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links) as span:
add_span_attributes(span, TraceOperationTypes.PUBLISH, sender, message_count=len(links))
add_plugin_span_attributes(span, TraceOperationTypes.PUBLISH, sender, message_count=len(links))
yield
elif TRACER is not None:
if settings.tracing_enabled():
with TRACER.start_as_current_span(span_name, kind=SpanKind.CLIENT) as span:
attributes = get_span_attributes(TraceOperationTypes.PUBLISH, sender, message_count=len(links))
span.set_attributes(attributes)
yield
else:
yield
else:
yield
Expand All @@ -124,6 +150,7 @@ def receive_trace_context_manager(
start_time: Optional[int] = None,
) -> Iterator[None]:
"""Tracing for receiving messages.

:param receiver: The receiver that is receiving the message.
:type receiver: ~azure.servicebus.ServiceBusReceiver or ~azure.servicebus.aio.ServiceBusReceiverAsync
:param span_name: The name of the tracing span.
Expand All @@ -135,11 +162,30 @@ def receive_trace_context_manager(
:return: An iterator that yields the tracing span.
:rtype: iterator
"""
links = links or []
span_impl_type: Optional[Type[AbstractSpan]] = settings.tracing_implementation()
if span_impl_type is not None:
links = links or []
with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links, start_time=start_time) as span:
add_span_attributes(span, TraceOperationTypes.RECEIVE, receiver, message_count=len(links))
with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links) as span:
add_plugin_span_attributes(span, TraceOperationTypes.RECEIVE, receiver, message_count=len(links))
yield
elif TRACER is not None:
if settings.tracing_enabled():
# Depending on the azure-core version, start_as_current_span may or may not support start_time as a
# keyword argument. Handle both cases.
try:
with TRACER.start_as_current_span( # type: ignore[call-arg] # pylint: disable=unexpected-keyword-arg
span_name, kind=SpanKind.CLIENT, start_time=start_time, links=links
) as span:
attributes = get_span_attributes(TraceOperationTypes.RECEIVE, receiver, message_count=len(links))
span.set_attributes(attributes)
yield
except TypeError:
# If start_time is not supported, just call without it.
with TRACER.start_as_current_span(span_name, kind=SpanKind.CLIENT, links=links) as span:
attributes = get_span_attributes(TraceOperationTypes.RECEIVE, receiver, message_count=len(links))
span.set_attributes(attributes)
yield
else:
yield
else:
yield
Expand All @@ -150,6 +196,7 @@ def settle_trace_context_manager(
receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], operation: str, links: Optional[List[Link]] = None
):
"""Tracing for settling messages.

:param receiver: The receiver that is settling the message.
:type receiver: ~azure.servicebus.ServiceBusReceiver or ~azure.servicebus.aio.ServiceBusReceiver
:param operation: The operation that is being performed on the message.
Expand All @@ -163,10 +210,27 @@ def settle_trace_context_manager(
if span_impl_type is not None:
links = links or []
with span_impl_type(name=f"ServiceBus.{operation}", kind=SpanKind.CLIENT, links=links) as span:
add_span_attributes(span, TraceOperationTypes.SETTLE, receiver)
add_plugin_span_attributes(span, TraceOperationTypes.SETTLE, receiver)
yield
else:
yield
elif TRACER is not None:
if settings.tracing_enabled():
with TRACER.start_as_current_span(f"ServiceBus.{operation}", kind=SpanKind.CLIENT) as span:
attributes = get_span_attributes(TraceOperationTypes.SETTLE, receiver)
span.set_attributes(attributes)
yield
else:
yield


def _update_message_with_trace_context(message, amqp_transport, context):
if "traceparent" in context:
message = amqp_transport.update_message_app_properties(
message, TRACE_DIAGNOSTIC_ID_PROPERTY, context["traceparent"]
)
message = amqp_transport.update_message_app_properties(message, TRACE_PARENT_PROPERTY, context["traceparent"])
if "tracestate" in context:
message = amqp_transport.update_message_app_properties(message, TRACE_STATE_PROPERTY, context["tracestate"])
return message


def trace_message(
Expand All @@ -191,20 +255,8 @@ def trace_message(
span_impl_type: Optional[Type[AbstractSpan]] = settings.tracing_implementation()
if span_impl_type is not None:
with span_impl_type(name=SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER) as message_span:
headers = message_span.to_header()

if "traceparent" in headers:
message = amqp_transport.update_message_app_properties(
message, TRACE_DIAGNOSTIC_ID_PROPERTY, headers["traceparent"]
)
message = amqp_transport.update_message_app_properties(
message, TRACE_PARENT_PROPERTY, headers["traceparent"]
)

if "tracestate" in headers:
message = amqp_transport.update_message_app_properties(
message, TRACE_STATE_PROPERTY, headers["tracestate"]
)
context = message_span.to_header()
message = _update_message_with_trace_context(message, amqp_transport, context)

message_span.add_attribute(TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE)
message_span.add_attribute(
Expand All @@ -215,6 +267,17 @@ def trace_message(
for key, value in additional_attributes.items():
if value is not None:
message_span.add_attribute(key, value)
elif TRACER is not None:
if settings.tracing_enabled():
with TRACER.start_as_current_span(SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER) as message_span:
trace_context = TRACER.get_trace_context()
message = _update_message_with_trace_context(message, amqp_transport, trace_context)
attributes = {
TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE: TraceAttributes.TRACE_NAMESPACE,
TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE: TraceAttributes.TRACE_MESSAGING_SYSTEM,
**(additional_attributes or {}),
}
message_span.set_attributes(attributes)

except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("trace_message had an exception %r", exp)
Expand All @@ -226,11 +289,7 @@ def get_receive_links(messages: Union[ServiceBusReceivedMessage, Iterable[Servic
if not is_tracing_enabled():
return []

trace_messages = (
messages
if isinstance(messages, Iterable)
else (messages,)
)
trace_messages = messages if isinstance(messages, Iterable) else (messages,)

links = []
try:
Expand Down Expand Up @@ -277,7 +336,7 @@ def get_span_links_from_batch(batch: ServiceBusMessageBatch) -> List[Link]:
return links


def get_span_link_from_message(message: Union[uamqp_Message, pyamqp_Message, ServiceBusMessage]) -> Optional[Link]:
def get_span_link_from_message(message: Any) -> Optional[Link]:
"""Create a span link from a message.

This will extract the traceparent and tracestate from the message application properties and create span links
Expand Down Expand Up @@ -309,7 +368,7 @@ def get_span_link_from_message(message: Union[uamqp_Message, pyamqp_Message, Ser
return Link(headers)


def add_span_attributes(
def add_plugin_span_attributes(
span: AbstractSpan,
operation_type: TraceOperationTypes,
handler: Union[BaseHandler, BaseHandlerAsync],
Expand All @@ -322,25 +381,35 @@ def add_span_attributes(
~azure.servicebus.aio._base_handler_async.BaseHandlerAsync handler: The handler that is performing the operation.
:param int message_count: The number of messages being sent or received.
"""
attributes = get_span_attributes(operation_type, handler, message_count)
for key, value in attributes.items():
if value is not None:
span.add_attribute(key, value)

span.add_attribute(TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE)
span.add_attribute(TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM)
span.add_attribute(TraceAttributes.TRACE_MESSAGING_OPERATION_ATTRIBUTE, operation_type)

if message_count > 1:
span.add_attribute(TraceAttributes.TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE, message_count)

if operation_type in (TraceOperationTypes.PUBLISH, TraceOperationTypes.RECEIVE):
# Maintain legacy attributes for backwards compatibility.
span.add_attribute(
TraceAttributes.LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE,
handler._entity_name, # pylint: disable=protected-access
)
span.add_attribute(TraceAttributes.LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE, handler.fully_qualified_namespace)
def get_span_attributes(
operation_type: TraceOperationTypes,
handler: Union[BaseHandler, BaseHandlerAsync],
message_count: int = 0,
) -> dict:
"""Return a dict of attributes for a span based on the operation type.

elif operation_type == TraceOperationTypes.SETTLE:
span.add_attribute(TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE, handler.fully_qualified_namespace)
span.add_attribute(
TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE,
handler._entity_name, # pylint: disable=protected-access
)
:param TraceOperationTypes operation_type: The operation type.
:param ~azure.servicebus._base_handler.BaseHandler or
~azure.servicebus.aio._base_handler_async.BaseHandlerAsync handler: The handler that is performing the operation.
:param int message_count: The number of messages being sent or received.
:return: Dictionary of span attributes.
:rtype: dict
"""
attributes: Dict[str, Any] = {
TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE: TraceAttributes.TRACE_NAMESPACE,
TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE: TraceAttributes.TRACE_MESSAGING_SYSTEM,
TraceAttributes.TRACE_MESSAGING_OPERATION_ATTRIBUTE: operation_type,
}
if message_count > 1:
attributes[TraceAttributes.TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE] = message_count
attributes[TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE] = handler.fully_qualified_namespace
attributes[TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE] = (
handler._entity_name # pylint: disable=protected-access
)
return attributes
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-e ../../core/azure-core
../../core/azure-core
azure-identity~=1.17.0
-e ../../../tools/azure-sdk-tools
azure-mgmt-servicebus~=8.0.0
aiohttp>=3.0
websocket-client
azure-mgmt-resource<=16.0.0
azure-mgmt-resource<=16.0.0