diff --git a/sqlserver/assets/configuration/spec.yaml b/sqlserver/assets/configuration/spec.yaml index 9053a06f29549..bb37a786382fd 100644 --- a/sqlserver/assets/configuration/spec.yaml +++ b/sqlserver/assets/configuration/spec.yaml @@ -885,7 +885,9 @@ files: display_default: false - name: collect_raw_query_statement description: | - Configure the collection of raw query statements in query activity and execution plans. + Configure the collection of raw query statements in query activity, execution plans, and XE events. + To collect raw query statements from XE events, set `xe_collection.query_completions.enabled` and + `xe_collection.query_errors.enabled` to `true`. Raw query statements and execution plans may contain sensitive information (e.g., passwords) or personally identifiable information in query text. Enabling this option will allow the collection and ingestion of raw query statements and @@ -997,6 +999,75 @@ files: value: example: false type: boolean + - name: xe_collection + description: | + Configure the collection of events from XE (Extended Events) sessions. Requires `dbm: true`. + + Set `collect_raw_query_statement.enabled` to `true` to collect the raw query statements for each event. + options: + - name: debug_sample_events + description: | + Set the maximum number of XE events to log in debug mode per collection. Used for troubleshooting. + This only affects logging when debug mode is enabled. Defaults to 3. + hidden: true + value: + type: integer + example: 3 + display_default: 3 + - name: query_completions + description: | + Configure the collection of completed queries from the `datadog_query_completions` XE session. + + Set `query_completions.enabled` to `true` to enable the collection of query completion events. + + Use `query_completions.collection_interval` to set the interval (in seconds) for the collection of + query completion events. Defaults to 10 seconds. If you intend on updating this value, + it is strongly recommended to use a consistent value throughout all SQL Server agent deployments. + + Use `query_completions.max_events` to set the maximum number of query completion events to process + per collection. Note that SQL Server's ring buffer has a maximum of 1000 events per query, + so values above 1000 will still be capped at 1000 by the database engine. Defaults to 1000. + value: + type: object + properties: + - name: enabled + type: boolean + example: false + - name: collection_interval + type: number + example: 10 + display_default: 10 + - name: max_events + type: integer + example: 1000 + display_default: 1000 + - name: query_errors + description: | + Configure the collection of query errors from the `datadog_query_errors` XE session. + + Set `query_errors.enabled` to `true` to enable the collection of query error events. + + Use `query_errors.collection_interval` to set the interval (in seconds) for the collection of + query error events. Defaults to 10 seconds. If you intend on updating this value, + it is strongly recommended to use a consistent value throughout all SQL Server agent deployments. + + Use `query_errors.max_events` to set the maximum number of query error events to process + per collection. Note that SQL Server's ring buffer has a maximum of 1000 events per query, + so values above 1000 will still be capped at 1000 by the database engine. Defaults to 1000. + value: + type: object + properties: + - name: enabled + type: boolean + example: false + - name: collection_interval + type: number + example: 10 + display_default: 10 + - name: max_events + type: integer + example: 1000 + display_default: 1000 - name: deadlocks_collection description: | Configure the collection of deadlock data. diff --git a/sqlserver/changelog.d/20229.added b/sqlserver/changelog.d/20229.added new file mode 100644 index 0000000000000..30ccae43c6448 --- /dev/null +++ b/sqlserver/changelog.d/20229.added @@ -0,0 +1,2 @@ +Added SQLServer Extended Event Handlers + diff --git a/sqlserver/datadog_checks/sqlserver/config.py b/sqlserver/datadog_checks/sqlserver/config.py index bd4777fdb57d7..cab0d29794cd6 100644 --- a/sqlserver/datadog_checks/sqlserver/config.py +++ b/sqlserver/datadog_checks/sqlserver/config.py @@ -57,6 +57,7 @@ def __init__(self, init_config, instance, log): self.activity_config: dict = instance.get('query_activity', {}) or {} self.schema_config: dict = instance.get('schemas_collection', {}) or {} self.deadlocks_config: dict = instance.get('deadlocks_collection', {}) or {} + self.xe_collection_config: dict = instance.get('xe_collection', {}) or {} self.cloud_metadata: dict = {} aws: dict = instance.get('aws', {}) or {} gcp: dict = instance.get('gcp', {}) or {} diff --git a/sqlserver/datadog_checks/sqlserver/config_models/instance.py b/sqlserver/datadog_checks/sqlserver/config_models/instance.py index d835fe851070e..22c0b3025593a 100644 --- a/sqlserver/datadog_checks/sqlserver/config_models/instance.py +++ b/sqlserver/datadog_checks/sqlserver/config_models/instance.py @@ -347,6 +347,36 @@ class SchemasCollection(BaseModel): max_execution_time: Optional[float] = None +class QueryCompletions(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = Field(None, examples=[10]) + enabled: Optional[bool] = Field(None, examples=[False]) + max_events: Optional[int] = Field(None, examples=[1000]) + + +class QueryErrors(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = Field(None, examples=[10]) + enabled: Optional[bool] = Field(None, examples=[False]) + max_events: Optional[int] = Field(None, examples=[1000]) + + +class XeCollection(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + debug_sample_events: Optional[int] = None + query_completions: Optional[QueryCompletions] = None + query_errors: Optional[QueryErrors] = None + + class InstanceConfig(BaseModel): model_config = ConfigDict( validate_default=True, @@ -406,6 +436,7 @@ class InstanceConfig(BaseModel): tags: Optional[tuple[str, ...]] = None use_global_custom_queries: Optional[str] = None username: Optional[str] = None + xe_collection: Optional[XeCollection] = None @model_validator(mode='before') def _initial_validation(cls, values): diff --git a/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example b/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example index ad7c37e977b3d..b9fe52fec0550 100644 --- a/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example +++ b/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example @@ -643,7 +643,9 @@ instances: # # keep_identifier_quotation: false - ## Configure the collection of raw query statements in query activity and execution plans. + ## Configure the collection of raw query statements in query activity, execution plans, and XE events. + ## To collect raw query statements from XE events, set `xe_collection.query_completions.enabled` and + ## `xe_collection.query_errors.enabled` to `true`. ## Raw query statements and execution plans may contain sensitive information (e.g., passwords) ## or personally identifiable information in query text. ## Enabling this option will allow the collection and ingestion of raw query statements and @@ -797,6 +799,42 @@ instances: # # propagate_agent_tags: false + ## Configure the collection of events from XE (Extended Events) sessions. Requires `dbm: true`. + ## + ## Set `collect_raw_query_statement.enabled` to `true` to collect the raw query statements for each event. + # + # xe_collection: + + ## @param query_completions - mapping - optional + ## Configure the collection of completed queries from the `datadog_query_completions` XE session. + ## + ## Set `query_completions.enabled` to `true` to enable the collection of query completion events. + ## + ## Use `query_completions.collection_interval` to set the interval (in seconds) for the collection of + ## query completion events. Defaults to 10 seconds. If you intend on updating this value, + ## it is strongly recommended to use a consistent value throughout all SQL Server agent deployments. + ## + ## Use `query_completions.max_events` to set the maximum number of query completion events to process + ## per collection. Note that SQL Server's ring buffer has a maximum of 1000 events per query, + ## so values above 1000 will still be capped at 1000 by the database engine. Defaults to 1000. + # + # query_completions: {} + + ## @param query_errors - mapping - optional + ## Configure the collection of query errors from the `datadog_query_errors` XE session. + ## + ## Set `query_errors.enabled` to `true` to enable the collection of query error events. + ## + ## Use `query_errors.collection_interval` to set the interval (in seconds) for the collection of + ## query error events. Defaults to 10 seconds. If you intend on updating this value, + ## it is strongly recommended to use a consistent value throughout all SQL Server agent deployments. + ## + ## Use `query_errors.max_events` to set the maximum number of query error events to process + ## per collection. Note that SQL Server's ring buffer has a maximum of 1000 events per query, + ## so values above 1000 will still be capped at 1000 by the database engine. Defaults to 1000. + # + # query_errors: {} + ## Configure the collection of deadlock data. # # deadlocks_collection: diff --git a/sqlserver/datadog_checks/sqlserver/sqlserver.py b/sqlserver/datadog_checks/sqlserver/sqlserver.py index 39024c1ea0cea..c5080e6d98dfa 100644 --- a/sqlserver/datadog_checks/sqlserver/sqlserver.py +++ b/sqlserver/datadog_checks/sqlserver/sqlserver.py @@ -53,6 +53,7 @@ from datadog_checks.sqlserver.statements import SqlserverStatementMetrics from datadog_checks.sqlserver.stored_procedures import SqlserverProcedureMetrics from datadog_checks.sqlserver.utils import Database, construct_use_statement, parse_sqlserver_major_version +from datadog_checks.sqlserver.xe_collection.registry import get_xe_session_handlers try: import datadog_agent @@ -157,6 +158,9 @@ def __init__(self, name, init_config, instances): self.agent_history = SqlserverAgentHistory(self, self._config) self.deadlocks = Deadlocks(self, self._config) + # XE Session Handlers + self.xe_session_handlers = [] + # _database_instance_emitted: limit the collection and transmission of the database instance metadata self._database_instance_emitted = TTLCache( maxsize=1, @@ -169,6 +173,7 @@ def __init__(self, name, init_config, instances): self.check_initializations.append(self.load_static_information) self.check_initializations.append(self.config_checks) self.check_initializations.append(self.make_metric_list_to_collect) + self.check_initializations.append(self.initialize_xe_session_handlers) # Query declarations self._query_manager = None @@ -177,6 +182,13 @@ def __init__(self, name, init_config, instances): self._schemas = Schemas(self, self._config) + def initialize_xe_session_handlers(self): + """Initialize the XE session handlers without starting them""" + # Initialize XE session handlers if not already initialized + if not self.xe_session_handlers: + self.xe_session_handlers = get_xe_session_handlers(self, self._config) + self.log.debug("Initialized %d XE session handlers", len(self.xe_session_handlers)) + def cancel(self): self.statement_metrics.cancel() self.procedure_metrics.cancel() @@ -185,6 +197,13 @@ def cancel(self): self._schemas.cancel() self.deadlocks.cancel() + # Cancel all XE session handlers + for handler in self.xe_session_handlers: + try: + handler.cancel() + except Exception as e: + self.log.error("Error canceling XE session handler for %s: %s", handler.session_name, e) + def config_checks(self): if self._config.autodiscovery and self.instance.get("database"): self.log.warning( @@ -810,6 +829,13 @@ def check(self, _): self.sql_metadata.run_job_loop(self.tags) self._schemas.run_job_loop(self.tags) self.deadlocks.run_job_loop(self.tags) + + # Run XE session handlers + for handler in self.xe_session_handlers: + try: + handler.run_job_loop(self.tags) + except Exception as e: + self.log.error("Error running XE session handler for %s: %s", handler.session_name, e) else: self.log.debug("Skipping check") diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/__init__.py b/sqlserver/datadog_checks/sqlserver/xe_collection/__init__.py new file mode 100644 index 0000000000000..c9f1f2a9882c7 --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/base.py b/sqlserver/datadog_checks/sqlserver/xe_collection/base.py new file mode 100644 index 0000000000000..006ae25227a04 --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/base.py @@ -0,0 +1,770 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import datetime +import json as json_module +import logging +from abc import abstractmethod +from io import BytesIO +from time import time + +from dateutil import parser +from lxml import etree + +from datadog_checks.base.utils.db.sql import compute_sql_signature +from datadog_checks.base.utils.db.utils import ( + DBMAsyncJob, + RateLimitingTTLCache, + default_json_event_encoding, + obfuscate_sql_with_metadata, +) +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.sqlserver.const import STATIC_INFO_ENGINE_EDITION, STATIC_INFO_VERSION +from datadog_checks.sqlserver.utils import is_azure_sql_database + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + + +def agent_check_getter(self): + return self._check + + +class TimestampHandler: + """Utility class for handling timestamps""" + + @staticmethod + def format_for_output(timestamp_str): + """ + Format a timestamp for output in a consistent format: YYYY-MM-DDTHH:MM:SS.sssZ + This is used only for the output payload, not for filtering. + + Args: + timestamp_str: A timestamp string in ISO format + Returns: + A formatted timestamp string or empty string if parsing fails + """ + if not timestamp_str: + return "" + try: + dt = parser.isoparse(timestamp_str) + return dt.isoformat(timespec='milliseconds').replace('+00:00', 'Z') + except Exception: + return timestamp_str + + @staticmethod + def calculate_start_time(end_timestamp, duration_ms): + """ + Calculate start time from end time and duration + + Args: + end_timestamp: The end timestamp in ISO format + duration_ms: Duration in milliseconds + + Returns: + Start timestamp in ISO format or empty string if calculation fails + """ + if not end_timestamp or duration_ms is None: + return "" + try: + end_dt = parser.isoparse(end_timestamp) + duration_delta = datetime.timedelta(milliseconds=float(duration_ms)) + start_dt = end_dt - duration_delta + return start_dt.isoformat(timespec='milliseconds').replace('+00:00', 'Z') + except Exception: + return "" + + +class XESessionBase(DBMAsyncJob): + """Base class for all XE session handlers""" + + # Base fields common to most/all event types + BASE_NUMERIC_FIELDS = { + "duration_ms": 0.0, + "session_id": 0, + "request_id": 0, + } + + BASE_STRING_FIELDS = [ + "database_name", + "client_hostname", + "client_app_name", + "username", + "activity_id", + ] + + BASE_SQL_FIELDS = [ + "statement", + "sql_text", + "batch_text", + ] + + # Fields that should use text representation when available + # Both rpc_completed and batch_completed use the result field + TEXT_FIELDS = ["result"] + + def __init__(self, check, config, session_name): + self.session_name = session_name + self.tags = [t for t in check.tags if not t.startswith('dd.internal')] + self._check = check + self._log = check.log + self._config = config + + # Get configuration based on session name + xe_config = getattr(self._config, 'xe_collection_config', {}) + if session_name == "datadog_query_completions": + session_config = xe_config.get('query_completions', {}) + elif session_name == "datadog_query_errors": + session_config = xe_config.get('query_errors', {}) + else: + session_config = {} + + # Set collection interval from config or use default + self.collection_interval = session_config.get('collection_interval', 10) + + # Set debug sample size from global XE config + self.debug_sample_events = xe_config.get('debug_sample_events', 3) + + # Set max events from session-specific config (capped at 1000 by SQL Server) + self.max_events = min(session_config.get('max_events', 1000), 1000) + self._last_event_timestamp = None # Initialize timestamp tracking + + # Configuration for raw query text (RQT) events + self._collect_raw_query = self._config.collect_raw_query_statement.get("enabled", False) + + self._raw_statement_text_cache = RateLimitingTTLCache( + maxsize=self._config.collect_raw_query_statement["cache_max_size"], + ttl=60 * 60 / self._config.collect_raw_query_statement["samples_per_hour_per_query"], + ) + + # Register event handlers - subclasses will override this + self._event_handlers = {} + + # We already know it's enabled since the registry only creates enabled handlers + self._enabled = True + + # Log configuration details + self._log.info( + f"Initializing XE session {session_name} with interval={self.collection_interval}s, " + f"max_events={self.max_events}, collect_raw_query={self._collect_raw_query}" + ) + + super(XESessionBase, self).__init__( + check, + run_sync=True, + enabled=True, + min_collection_interval=self._config.min_collection_interval, + dbms="sqlserver", + rate_limit=1 / float(self.collection_interval), + job_name=f"xe_{session_name}", + shutdown_callback=self._close_db_conn, + ) + self._conn_key_prefix = f"dbm-xe-{session_name}-" + self._is_azure_sql_database = False + self._check_azure_status() + + # Methods to allow subclasses to extend field definitions + def get_numeric_fields(self, event_type=None): + """Get numeric fields with defaults for given event type""" + return self.BASE_NUMERIC_FIELDS.copy() + + def get_string_fields(self, event_type=None): + """Get string fields for given event type""" + return self.BASE_STRING_FIELDS.copy() + + def get_sql_fields(self, event_type=None): + """Get SQL fields for given event type""" + if event_type == "sql_batch_completed": + return ["batch_text", "sql_text"] + elif event_type == "rpc_completed": + return ["statement", "sql_text"] + elif event_type == "module_end": + return ["statement", "sql_text"] + return self.BASE_SQL_FIELDS.copy() + + def register_event_handler(self, event_name, handler_method): + """Register a handler method for a specific event type""" + self._event_handlers[event_name] = handler_method + + def _check_azure_status(self): + """Check if this is Azure SQL Database""" + engine_edition = self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION, "") + self._is_azure_sql_database = is_azure_sql_database(engine_edition) + + def _close_db_conn(self): + """Close database connection on shutdown""" + pass + + def session_exists(self): + """Check if this XE session exists and is running""" + with self._check.connection.open_managed_default_connection(key_prefix=self._conn_key_prefix): + with self._check.connection.get_managed_cursor(key_prefix=self._conn_key_prefix) as cursor: + # For Azure SQL Database support + level = "" + if self._is_azure_sql_database: + level = "database_" + + # Build the query with proper parameterization + query = f"SELECT 1 FROM sys.dm_xe_{level}sessions WHERE name = ?" + cursor.execute(query, (self.session_name,)) + + return cursor.fetchone() is not None + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _query_ring_buffer(self): + """ + Query the ring buffer data and parse the XML on the client side. + This avoids expensive server-side XML parsing for better performance. + """ + raw_xml = None + with self._check.connection.open_managed_default_connection(key_prefix=self._conn_key_prefix): + with self._check.connection.get_managed_cursor(key_prefix=self._conn_key_prefix) as cursor: + # For Azure SQL Database support + level = "" + if self._is_azure_sql_database: + level = "database_" + + # Determine if we need to use CONVERT based on connector type + use_convert = False + if self._check.connection.connector == "adodbapi": + use_convert = True + self._log.debug("Using CONVERT syntax for Windows/adodbapi compatibility") + + try: + # Choose the appropriate query based on connector type + if use_convert: + query = f""" + SELECT CONVERT(NVARCHAR(MAX), t.target_data) AS target_xml + FROM sys.dm_xe_{level}sessions s + JOIN sys.dm_xe_{level}session_targets t + ON s.address = t.event_session_address + WHERE s.name = ? + AND t.target_name = 'ring_buffer' + """ + else: + query = f""" + SELECT CAST(t.target_data AS XML) AS target_xml + FROM sys.dm_xe_{level}sessions s + JOIN sys.dm_xe_{level}session_targets t + ON s.address = t.event_session_address + WHERE s.name = ? + AND t.target_name = 'ring_buffer' + """ + + cursor.execute(query, (self.session_name,)) + row = cursor.fetchone() + if row and row[0]: + raw_xml = str(row[0]) + except Exception as e: + self._log.error(f"Error querying ring buffer: {e}") + + if not raw_xml: + return None + + return raw_xml + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _process_events(self, xml_data): + """ + Parse and process ring buffer XML data in a single pass using lxml.etree.iterparse. + Filters events by timestamp and processes them directly. + + Returns: + List of processed event dictionaries + """ + if not xml_data: + return [] + + processed_events = [] + try: + try: + xml_stream = BytesIO(xml_data.encode('utf-8')) + except UnicodeEncodeError: + self._log.debug("UTF-8 encoding failed, falling back to UTF-16") + xml_stream = BytesIO(xml_data.encode('utf-16')) + + # Only parse 'end' events for tags + context = etree.iterparse(xml_stream, events=('end',), tag='event') + + for _, elem in context: + try: + # Get basic timestamp for filtering + timestamp = elem.get('timestamp') + + # Filter by timestamp + if not self._last_event_timestamp or (timestamp and timestamp > self._last_event_timestamp): + # Extract event attributes + event_data = {"timestamp": timestamp, "event_name": elem.get('name', '')} + + # Process the event using appropriate handler + event_name = event_data["event_name"] + if event_name in self._event_handlers: + handler = self._event_handlers[event_name] + if handler(elem, event_data): + processed_events.append(event_data) + else: + self._log.debug(f"No handler for event type: {event_name}") + except Exception as e: + self._log.error(f"Error processing event {elem.get('name', 'unknown')}: {e}") + + # Free memory for processed elements + elem.clear() + while elem.getprevious() is not None: + del elem.getparent()[0] + + # Stop if we've reached the maximum number of events + if len(processed_events) >= self.max_events: + self._log.debug( + f"Processed {len(processed_events)} events from ring buffer (limit of {self.max_events})" + ) + break + + return processed_events + + except Exception as e: + self._log.error(f"Error processing ring buffer events: {e}") + return [] + + @abstractmethod + def _normalize_event_impl(self, event): + """ + Implementation of event normalization - to be overridden by subclasses. + This method should apply the specific normalization logic for each event type. + """ + raise NotImplementedError + + def _normalize_event(self, event, custom_numeric_fields=None, custom_string_fields=None): + """ + Generic method to normalize and validate an event data structure. + + Args: + event: The raw event data dictionary + custom_numeric_fields: Optional override of numeric fields + custom_string_fields: Optional override of string fields + + Returns: + A normalized event dictionary with consistent types + """ + normalized = {} + + event_type = event.get("event_name", "") + + # Get the field definitions for this event type + numeric_fields = custom_numeric_fields or self.get_numeric_fields(event_type) + string_fields = custom_string_fields or self.get_string_fields(event_type) + + # Add the XE event type to normalized data + normalized["xe_type"] = event.get("event_name", "") + + # Format the event_fire_timestamp (from event's timestamp) + raw_timestamp = event.get("timestamp", "") + normalized["event_fire_timestamp"] = TimestampHandler.format_for_output(raw_timestamp) + + # Calculate and format query_start if duration_ms is available + if raw_timestamp and "duration_ms" in event and event.get("duration_ms") is not None: + normalized["query_start"] = TimestampHandler.calculate_start_time(raw_timestamp, event.get("duration_ms")) + else: + normalized["query_start"] = "" + + # Numeric fields with defaults + for field, default in numeric_fields.items(): + value = event.get(field) + if value is None: + normalized[field] = default + else: + try: + normalized[field] = float(value) if field == "duration_ms" else int(value) + except (ValueError, TypeError): + normalized[field] = default + + # String fields with defaults + for field in string_fields: + normalized[field] = str(event.get(field, "") or "") + + # Add SQL fields (statement, sql_text, batch_text) + for field in self.get_sql_fields(event_type): + if field in event: + normalized[field] = event[field] + + # Add query_signature if present + if "query_signature" in event: + normalized["query_signature"] = event["query_signature"] + + # Add raw_query_signature if present and raw query collection is enabled + if self._collect_raw_query and "raw_query_signature" in event: + normalized["raw_query_signature"] = event["raw_query_signature"] + + return normalized + + def _determine_dbm_type(self): + """ + Determine the dbm_type based on the session name. + Returns the appropriate dbm_type for the current session. + """ + + if self.session_name == "datadog_query_errors": + return "query_error" + elif self.session_name == "datadog_query_completions": + return "query_completion" + else: + self._log.warning(f"Unrecognized session name: {self.session_name}, using default dbm_type") + return "query_completion" + + def _create_event_payload(self, raw_event): + """ + Create a structured event payload for a single event with consistent format. + + Args: + raw_event: The raw event data to normalize + Returns: + A dictionary with the standard payload structure + """ + # Normalize the event - must be implemented by subclass + normalized_event = self._normalize_event_impl(raw_event) + + # Add SQL metadata and signatures to the normalized event + if 'query_signature' in raw_event: + normalized_event['query_signature'] = raw_event['query_signature'] + + return { + "host": self._check.resolved_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "sqlserver", + "dbm_type": self._determine_dbm_type(), + "event_source": self.session_name, + "collection_interval": self.collection_interval, + "ddtags": self.tags, + "timestamp": time() * 1000, + "sqlserver_version": self._check.static_info_cache.get(STATIC_INFO_VERSION, ""), + "sqlserver_engine_edition": self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION, ""), + "cloud_metadata": self._config.cloud_metadata, + "service": self._config.service, + "query_details": normalized_event, + } + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def run_job(self): + """Run the XE session collection job""" + self._log.info(f"Running job for {self.session_name} session") + if not self.session_exists(): + self._log.warning(f"XE session {self.session_name} not found or not running.") + return + + # Get the raw XML data + xml_data = self._query_ring_buffer() + + if not xml_data: + self._log.debug(f"No data found for session {self.session_name}") + return + + # Process the events + events = self._process_events(xml_data) + + if not events: + self._log.debug(f"No events processed from {self.session_name} session") + return + + # Timestamp gap detection - compare the last event timestamp from previous run + # with the first event timestamp from this run + if events and self._last_event_timestamp and 'timestamp' in events[0]: + current_first_timestamp = events[0]['timestamp'] + try: + prev_dt = parser.isoparse(self._last_event_timestamp) + curr_dt = parser.isoparse(current_first_timestamp) + gap_seconds = (curr_dt - prev_dt).total_seconds() + except Exception: + gap_seconds = None + self._log.debug( + f"[{self.session_name}] Timestamp gap: last={self._last_event_timestamp} " + f"first={current_first_timestamp}" + (f" gap_seconds={gap_seconds}" if gap_seconds is not None else "") + ) + + # Update timestamp tracking with the last event's raw timestamp for next run + if events and 'timestamp' in events[-1]: + self._last_event_timestamp = events[-1]['timestamp'] + self._log.debug(f"Updated checkpoint to {self._last_event_timestamp}") + + # Log a sample of events (up to max configured limit) for debugging + if self._log.isEnabledFor(logging.DEBUG): + sample_size = min(self.debug_sample_events, len(events)) + sample_events = events[:sample_size] + + try: + formatted_json = json_module.dumps(sample_events, indent=2, default=str) + self._log.debug( + f"Sample events from {self.session_name} session (limit={self.debug_sample_events}):\n" + f"{formatted_json}" + ) + except Exception as e: + self._log.error(f"Error formatting events for logging: {e}") + + # Determine the key for the batched events array based on session name + batch_key = ( + "sqlserver_query_errors" if self.session_name == "datadog_query_errors" else "sqlserver_query_completions" + ) + + # Create a list to collect all query details + all_query_details = [] + + # Track if we've logged an RQT sample for this batch + rqt_sample_logged = False + + # Process all events and collect them for batching + for event in events: + try: + # Obfuscate SQL fields and get the raw statement + obfuscated_event, raw_sql_fields = self._obfuscate_sql_fields(event) + + # Create a properly structured payload for the individual event + payload = self._create_event_payload(obfuscated_event) + + # Extract query details to add to the batch + query_details = payload.get("query_details", {}) + all_query_details.append({"query_details": query_details}) + + # Process RQT events individually + if self._collect_raw_query and raw_sql_fields: + # Create RQT event + rqt_event = self._create_rqt_event(obfuscated_event, raw_sql_fields, query_details) + + if rqt_event: + # Log the first successful RQT event we encounter in this batch + if not rqt_sample_logged and self._log.isEnabledFor(logging.DEBUG): + try: + rqt_payload_json = json_module.dumps(rqt_event, default=str, indent=2) + self._log.debug(f"Sample {self.session_name} RQT event payload:\n{rqt_payload_json}") + rqt_sample_logged = True + except Exception as e: + self._log.error(f"Error serializing RQT payload for logging: {e}") + + self._log.debug( + f"Created RQT event for query_signature={obfuscated_event.get('query_signature')}" + ) + + rqt_payload = json.dumps(rqt_event, default=default_json_event_encoding) + # Log RQT payload size + self._log.debug(f"RQT event payload size: {len(rqt_payload)} bytes") + self._check.database_monitoring_query_sample(rqt_payload) + + except Exception as e: + self._log.error(f"Error processing event: {e}") + continue + + # Create a single batched payload for all events if we have any + if all_query_details: + # Create base payload from the common fields (using the same structure as _create_event_payload) + batched_payload = { + "host": self._check.resolved_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "sqlserver", + "dbm_type": self._determine_dbm_type(), + "event_source": self.session_name, + "collection_interval": self.collection_interval, + "ddtags": self.tags, + "timestamp": time() * 1000, + "sqlserver_version": self._check.static_info_cache.get(STATIC_INFO_VERSION, ""), + "sqlserver_engine_edition": self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION, ""), + "cloud_metadata": self._config.cloud_metadata, + "service": self._config.service, + # Add the array of query details with the appropriate key + batch_key: all_query_details, + } + + # Log the batched payload for debugging + if self._log.isEnabledFor(logging.DEBUG): + try: + # Only include up to max configured limit events in the log + log_payload = batched_payload.copy() + if len(all_query_details) > self.debug_sample_events: + log_payload[batch_key] = all_query_details[: self.debug_sample_events] + remaining_events = len(all_query_details) - self.debug_sample_events + log_payload[batch_key].append({"truncated": f"...and {remaining_events} more events"}) + + payload_json = json_module.dumps(log_payload, default=str, indent=2) + self._log.debug( + f"Batched {self.session_name} payload with {len(all_query_details)} events " + f"(showing {self.debug_sample_events}):\n{payload_json}" + ) + except Exception as e: + self._log.error(f"Error serializing batched payload for logging: {e}") + + # Send the batched payload + serialized_payload = json.dumps(batched_payload, default=default_json_event_encoding) + # Log payload size + self._log.debug(f"Batched {self.session_name} payload size: {len(serialized_payload)} bytes") + self._check.database_monitoring_query_activity(serialized_payload) + + self._log.info(f"Found {len(events)} events from {self.session_name} session") + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _obfuscate_sql_fields(self, event): + """SQL field obfuscation and signature creation""" + obfuscated_event = event.copy() + raw_sql_fields = {} + + # Get SQL fields for this event type + sql_fields = self.get_sql_fields(event.get('event_name', '')) + + # Process each SQL field that exists in the event + for field in sql_fields: + if field in event and event[field]: + raw_sql_fields[field] = event[field] + + try: + # Obfuscate the SQL + result = obfuscate_sql_with_metadata( + event[field], self._config.obfuscator_options, replace_null_character=True + ) + + # Store the obfuscated SQL + obfuscated_event[field] = result['query'] + + # Store metadata from the first field with metadata + if 'dd_commands' not in obfuscated_event and result['metadata'].get('commands'): + obfuscated_event['dd_commands'] = result['metadata']['commands'] + if 'dd_tables' not in obfuscated_event and result['metadata'].get('tables'): + obfuscated_event['dd_tables'] = result['metadata']['tables'] + if result['metadata'].get('comments'): + if 'dd_comments' not in obfuscated_event: + obfuscated_event['dd_comments'] = [] + obfuscated_event['dd_comments'].extend(result['metadata']['comments']) + + # Compute query_signature and raw_query_signature from the primary field + primary_field = self._get_primary_sql_field(event) + if field == primary_field or 'query_signature' not in obfuscated_event: + obfuscated_event['query_signature'] = compute_sql_signature(result['query']) + raw_signature = compute_sql_signature(event[field]) + raw_sql_fields['raw_query_signature'] = raw_signature + if self._collect_raw_query: + obfuscated_event['raw_query_signature'] = raw_signature + + except Exception as e: + self._log.debug(f"Error obfuscating {field}: {e}") + obfuscated_event[field] = "ERROR: failed to obfuscate" + + # Deduplicate comments if any + if 'dd_comments' in obfuscated_event: + obfuscated_event['dd_comments'] = list(set(obfuscated_event['dd_comments'])) + + return obfuscated_event, raw_sql_fields if raw_sql_fields else None + + def _get_primary_sql_field(self, event): + """ + Get the primary SQL field for this event type. + This is the field that will be used for the main query signature. + + Subclasses should override this method to return their primary field. + + Args: + event: The event data dictionary + + Returns: + Name of the primary SQL field + """ + # Default implementation - will be overridden by subclasses + # Try statement first, then sql_text, then batch_text + for field in self.get_sql_fields(event.get('event_name', '')): + if field in event and event[field]: + return field + return None + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _create_rqt_event(self, event, raw_sql_fields, query_details): + """ + Create a Raw Query Text (RQT) event for a raw SQL statement. + + Args: + event: The event data dictionary with obfuscated SQL fields + raw_sql_fields: Dictionary containing the original SQL fields + query_details: Dictionary containing normalized query details with timing information + + Returns: + Dictionary with the RQT event payload or None if the event should be skipped + """ + if not self._collect_raw_query or not raw_sql_fields: + self._log.debug("Skipping RQT event creation: raw query collection disabled or no raw SQL fields") + return None + + # Check if we have the necessary signatures + query_signature = event.get('query_signature') + if not query_signature: + self._log.debug("Skipping RQT event creation: Missing query_signature") + return None + + # Get the primary SQL field for this event type + primary_field = self._get_primary_sql_field(event) + if not primary_field or primary_field not in raw_sql_fields: + self._log.debug( + f"Skipping RQT event creation: Primary SQL field {primary_field} not found in raw_sql_fields" + ) + return None + + # Use rate limiting cache to control how many RQT events we send + cache_key = (query_signature, raw_sql_fields['raw_query_signature']) + if not self._raw_statement_text_cache.acquire(cache_key): + self._log.debug(f"Skipping RQT event creation: Rate limited by cache for signature {query_signature}") + return None + + # Create basic db fields structure + db_fields = { + "instance": event.get('database_name', None), + "query_signature": query_signature, + "raw_query_signature": raw_sql_fields['raw_query_signature'], + "statement": raw_sql_fields[primary_field], # Primary field becomes the statement + "metadata": { + "tables": event.get('dd_tables', None), + "commands": event.get('dd_commands', None), + "comments": event.get('dd_comments', None), + }, + } + + # Create the sqlserver section with appropriate fields based on session type + sqlserver_fields = { + "session_id": event.get("session_id"), + "xe_type": event.get("event_name"), + "event_fire_timestamp": query_details.get("event_fire_timestamp"), + } + + # Only include duration and query_start for non-error events + is_error_event = self.session_name == "datadog_query_errors" + if not is_error_event: + sqlserver_fields.update( + { + "duration_ms": event.get("duration_ms"), + "query_start": query_details.get("query_start"), + } + ) + else: + # Include error_number and message for error events + sqlserver_fields.update( + { + "error_number": event.get("error_number"), + "message": event.get("message"), + } + ) + + # Add additional SQL fields to the sqlserver section + # but only if they're not the primary field and not empty + for field in ["statement", "sql_text", "batch_text"]: + if field != primary_field and field in raw_sql_fields and raw_sql_fields[field]: + sqlserver_fields[field] = raw_sql_fields[field] + + return { + "timestamp": time() * 1000, + "host": self._check.resolved_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "sqlserver", + "dbm_type": "rqt", + "event_source": self.session_name, + "ddtags": ",".join(self.tags), + 'service': self._config.service, + "db": db_fields, + "sqlserver": sqlserver_fields, + } diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/error_events.py b/sqlserver/datadog_checks/sqlserver/xe_collection/error_events.py new file mode 100644 index 0000000000000..e6271cbea2574 --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/error_events.py @@ -0,0 +1,163 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from datadog_checks.base.utils.tracking import tracked_method + +from .base import XESessionBase, agent_check_getter +from .xml_tools import ( + extract_field, + extract_int_value, + extract_value, +) + + +class ErrorEventsHandler(XESessionBase): + """Handler for Error Events and Attentions""" + + # Event-specific field extensions + ERROR_REPORTED_SPECIFIC_NUMERIC_FIELDS = { + "error_number": 0, + "severity": 0, + "state": 0, + "category": 0, + } + + ERROR_REPORTED_SPECIFIC_STRING_FIELDS = [ + "message", + "is_intercepted", + "user_defined", + "destination", + ] + + ATTENTION_SPECIFIC_NUMERIC_FIELDS = {} + + ATTENTION_SPECIFIC_STRING_FIELDS = [] + + def __init__(self, check, config): + super(ErrorEventsHandler, self).__init__(check, config, "datadog_query_errors") + + # Register handlers for different event types using the strategy pattern + self.register_event_handler('error_reported', self._process_error_reported_event) + self.register_event_handler('attention', self._process_attention_event) + + def get_numeric_fields(self, event_type=None): + """Get numeric fields with defaults for given event type""" + base_fields = super().get_numeric_fields(event_type) + + if event_type == 'error_reported': + base_fields.update(self.ERROR_REPORTED_SPECIFIC_NUMERIC_FIELDS) + elif event_type == 'attention': + base_fields.update(self.ATTENTION_SPECIFIC_NUMERIC_FIELDS) + + return base_fields + + def get_string_fields(self, event_type=None): + """Get string fields for given event type""" + base_fields = super().get_string_fields(event_type) + + if event_type == 'error_reported': + return base_fields + self.ERROR_REPORTED_SPECIFIC_STRING_FIELDS + elif event_type == 'attention': + return base_fields + self.ATTENTION_SPECIFIC_STRING_FIELDS + + return base_fields + + def get_sql_fields(self, event_type=None): + """Get SQL fields for given event type""" + return ["sql_text"] + + @tracked_method(agent_check_getter=agent_check_getter) + def _process_events(self, xml_data): + """Process error events from the XML data using base implementation""" + return super()._process_events(xml_data) + + def _process_error_reported_event(self, event, event_data): + """Process error_reported event""" + # Extract data elements + for data in event.findall('./data'): + data_name = data.get('name') + if not data_name: + continue + + # Use field extraction from xml_tools + extract_field( + data, + event_data, + data_name, + self.get_numeric_fields(event_data.get('event_name')), + self.TEXT_FIELDS, + self._log, + ) + + # Extract action elements + for action in event.findall('./action'): + action_name = action.get('name') + if action_name: + event_data[action_name] = extract_value(action) + + return True + + def _process_attention_event(self, event, event_data): + """Process attention event""" + # Process data elements + for data in event.findall('./data'): + data_name = data.get('name') + if not data_name: + continue + + # Use unified field extraction + extract_field( + data, + event_data, + data_name, + self.get_numeric_fields(event_data.get('event_name')), + self.TEXT_FIELDS, + self._log, + ) + + # Extract action elements + for action in event.findall('./action'): + action_name = action.get('name') + if not action_name: + continue + + if action_name == 'session_id' or action_name == 'request_id': + # These are numeric values in the actions + value = extract_int_value(action) + if value is not None: + event_data[action_name] = value + else: + event_data[action_name] = extract_value(action) + + return True + + def _normalize_event_impl(self, event): + """Normalize error event data based on event type""" + # First use the base normalization with type-specific fields + normalized = self._normalize_event(event) + + # For error events, remove query_start and duration_ms fields since they're not applicable + if 'query_start' in normalized: + del normalized['query_start'] + if 'duration_ms' in normalized: + del normalized['duration_ms'] + + return normalized + + def _get_primary_sql_field(self, event): + """ + Get the primary SQL field for error events. + For error events, sql_text is typically the only SQL field. + + Args: + event: The event data dictionary + + Returns: + Name of the primary SQL field for this event type + """ + # For most error events, sql_text is the only SQL field + if 'sql_text' in event and event['sql_text']: + return 'sql_text' + + return None diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/query_completion_events.py b/sqlserver/datadog_checks/sqlserver/xe_collection/query_completion_events.py new file mode 100644 index 0000000000000..c457d8e5131eb --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/query_completion_events.py @@ -0,0 +1,187 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from datadog_checks.base.utils.tracking import tracked_method + +from .base import XESessionBase, agent_check_getter +from .xml_tools import ( + extract_field, + extract_value, +) + + +class QueryCompletionEventsHandler(XESessionBase): + """ + Combined handler for SQL query completion events: + - sql_batch_completed - SQL batch completion + - rpc_completed - Remote procedure call completion + - module_end - Stored procedure, trigger, or function completion + + All events are captured in a single XE session named "datadog_query_completions". + """ + + # Event-specific field extensions + BATCH_SPECIFIC_NUMERIC_FIELDS = { + "cpu_time": 0, + "page_server_reads": 0, + "physical_reads": 0, + "logical_reads": 0, + "writes": 0, + "spills": 0, + "row_count": 0, + } + + BATCH_SPECIFIC_STRING_FIELDS = [ + "result", + ] + + RPC_SPECIFIC_NUMERIC_FIELDS = { + "cpu_time": 0, + "page_server_reads": 0, + "physical_reads": 0, + "logical_reads": 0, + "writes": 0, + "spills": 0, + "row_count": 0, + "object_id": 0, + "line_number": 0, + } + + RPC_SPECIFIC_STRING_FIELDS = [ + "result", + "procedure_name", + "data_stream", + "connection_reset_option", + ] + + MODULE_SPECIFIC_NUMERIC_FIELDS = { + "source_database_id": 0, + "object_id": 0, + "row_count": 0, + "line_number": 0, + "offset": 0, + "offset_end": 0, + } + + MODULE_SPECIFIC_STRING_FIELDS = [ + "object_name", + "object_type", + ] + + def __init__(self, check, config): + super(QueryCompletionEventsHandler, self).__init__(check, config, "datadog_query_completions") + + # Register handlers for different event types using the strategy pattern + self.register_event_handler('sql_batch_completed', self._process_query_event) + self.register_event_handler('rpc_completed', self._process_query_event) + self.register_event_handler('module_end', self._process_query_event) + + def get_numeric_fields(self, event_type=None): + """Get numeric fields with defaults for given event type""" + base_fields = super().get_numeric_fields(event_type) + + if event_type == 'sql_batch_completed': + base_fields.update(self.BATCH_SPECIFIC_NUMERIC_FIELDS) + elif event_type == 'rpc_completed': + base_fields.update(self.RPC_SPECIFIC_NUMERIC_FIELDS) + elif event_type == 'module_end': + base_fields.update(self.MODULE_SPECIFIC_NUMERIC_FIELDS) + + return base_fields + + def get_string_fields(self, event_type=None): + """Get string fields for given event type""" + base_fields = super().get_string_fields(event_type) + + if event_type == 'sql_batch_completed': + return base_fields + self.BATCH_SPECIFIC_STRING_FIELDS + elif event_type == 'rpc_completed': + return base_fields + self.RPC_SPECIFIC_STRING_FIELDS + elif event_type == 'module_end': + return base_fields + self.MODULE_SPECIFIC_STRING_FIELDS + + return base_fields + + @tracked_method(agent_check_getter=agent_check_getter) + def _process_events(self, xml_data): + """Process all query completion event types using base implementation""" + return super()._process_events(xml_data) + + def _process_query_event(self, event, event_data): + """ + Process any query completion event (batch, RPC, or module). + All three event types share the same processing logic. + + Args: + event: The XML event element + event_data: The event data dictionary to populate + + Returns: + True if processing was successful + """ + # Process data elements + for data in event.findall('./data'): + data_name = data.get('name') + if not data_name: + continue + + # Use unified field extraction + extract_field( + data, + event_data, + data_name, + self.get_numeric_fields(event_data.get('event_name')), + self.TEXT_FIELDS, + self._log, + ) + + # Process action elements + self._process_action_elements(event, event_data) + + return True + + def _process_action_elements(self, event, event_data): + """Process common action elements for all event types""" + for action in event.findall('./action'): + action_name = action.get('name') + if action_name: + # Add activity_id support + if action_name == 'attach_activity_id': + event_data['activity_id'] = extract_value(action) + else: + event_data[action_name] = extract_value(action) + + def _normalize_event_impl(self, event): + """ + Implementation of event normalization based on event type. + """ + # All event types can use the base normalization with type-specific fields + return self._normalize_event(event) + + def _get_primary_sql_field(self, event): + """ + Get the primary SQL field based on the event type. + This is the field that will be used as the main source for query signatures. + + Args: + event: The event data dictionary + + Returns: + Name of the primary SQL field for this event type + """ + event_name = event.get('event_name', '') + + if event_name == 'sql_batch_completed': + return 'batch_text' + elif event_name == 'rpc_completed': + return 'statement' + elif event_name == 'module_end': + return 'statement' + + # Default fallback - try fields in priority order + for field in self.get_sql_fields(event_name): + if field in event and event[field]: + return field + + return None diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/registry.py b/sqlserver/datadog_checks/sqlserver/xe_collection/registry.py new file mode 100644 index 0000000000000..8621881feb3b3 --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/registry.py @@ -0,0 +1,29 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from .error_events import ErrorEventsHandler +from .query_completion_events import QueryCompletionEventsHandler + + +def get_xe_session_handlers(check, config): + """Get the enabled XE session handlers based on configuration""" + handlers = [] + + # Get the XE collection configuration + xe_config = getattr(config, 'xe_collection_config', {}) + + # Only create and add query completions handler if enabled + query_completions_config = xe_config.get('query_completions', {}) + if query_completions_config.get('enabled', False): + handlers.append(QueryCompletionEventsHandler(check, config)) + check.log.debug("Query completions XE session handler enabled") + + # Only create and add query errors handler if enabled + query_errors_config = xe_config.get('query_errors', {}) + if query_errors_config.get('enabled', False): + handlers.append(ErrorEventsHandler(check, config)) + check.log.debug("Query errors XE session handler enabled") + + check.log.info("Created %d enabled XE session handlers", len(handlers)) + return handlers diff --git a/sqlserver/datadog_checks/sqlserver/xe_collection/xml_tools.py b/sqlserver/datadog_checks/sqlserver/xe_collection/xml_tools.py new file mode 100644 index 0000000000000..4db8c3f8b4f9e --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/xe_collection/xml_tools.py @@ -0,0 +1,84 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import logging + +logger = logging.getLogger(__name__) + + +def extract_value(element, default=None): + """Helper method to extract values from XML elements with consistent handling""" + if element is None: + return default + + # Try to get text from value element using XPath + try: + value_nodes = element.xpath('./value/text()') + if value_nodes and value_nodes[0]: + return value_nodes[0].strip() + except (AttributeError, IndexError): + pass + + # Fall back to element's text content + if element.text: + return element.text.strip() + + return default + + +def extract_int_value(element, default=None): + """Helper method to extract integer values with error handling""" + value = extract_value(element, default) + if value is None: + return default + + try: + return int(value) + except (ValueError, TypeError) as e: + logger.warning("Error converting to int: %s", e) + return default + + +def extract_text_representation(element, default=None): + """Get the text representation when both value and text are available""" + if element is None: + return default + + # Use XPath to get text from "text" element + try: + text_nodes = element.xpath('./text/text()') + if text_nodes and text_nodes[0]: + return text_nodes[0].strip() + except (AttributeError, IndexError): + pass + + return default + + +def extract_field(data, event_data, field_name, numeric_fields, text_fields, log=None): + """Extract field value based on its type""" + if field_name == 'duration': + extract_duration(data, event_data, log) + elif field_name in numeric_fields: + value = extract_int_value(data) + if value is not None: + event_data[field_name] = value + elif field_name in text_fields: + text_value = extract_text_representation(data) + if text_value is not None: + event_data[field_name] = text_value + else: + event_data[field_name] = extract_value(data) + else: + event_data[field_name] = extract_value(data) + + +def extract_duration(data, event_data, log=None): + """Extract duration value and convert to milliseconds""" + duration_value = extract_int_value(data) + if duration_value is not None: + # Convert from microseconds to milliseconds + event_data["duration_ms"] = duration_value / 1000 + else: + event_data["duration_ms"] = None diff --git a/sqlserver/tests/compose-ha/sql/aoag_primary.sql b/sqlserver/tests/compose-ha/sql/aoag_primary.sql index a7f4cab4ccdeb..ad6e1c9968d39 100644 --- a/sqlserver/tests/compose-ha/sql/aoag_primary.sql +++ b/sqlserver/tests/compose-ha/sql/aoag_primary.sql @@ -453,3 +453,110 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO + +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/compose-ha/sql/aoag_secondary.sql b/sqlserver/tests/compose-ha/sql/aoag_secondary.sql index ddfa2dc671d9b..9b0fffd9a120a 100644 --- a/sqlserver/tests/compose-ha/sql/aoag_secondary.sql +++ b/sqlserver/tests/compose-ha/sql/aoag_secondary.sql @@ -82,3 +82,110 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO + +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/compose-high-cardinality-windows/setup.sql b/sqlserver/tests/compose-high-cardinality-windows/setup.sql index 123fed07f797e..be481eb72ec7a 100644 --- a/sqlserver/tests/compose-high-cardinality-windows/setup.sql +++ b/sqlserver/tests/compose-high-cardinality-windows/setup.sql @@ -381,3 +381,110 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO + +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/compose-high-cardinality/setup.sql b/sqlserver/tests/compose-high-cardinality/setup.sql index fe26ee8a727c8..b20ac90e17c7f 100644 --- a/sqlserver/tests/compose-high-cardinality/setup.sql +++ b/sqlserver/tests/compose-high-cardinality/setup.sql @@ -363,3 +363,110 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO + +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/compose-windows/setup.sql b/sqlserver/tests/compose-windows/setup.sql index 6433deeed5c7a..2bdff586589b4 100644 --- a/sqlserver/tests/compose-windows/setup.sql +++ b/sqlserver/tests/compose-windows/setup.sql @@ -369,3 +369,110 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO + +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/compose/setup.sql b/sqlserver/tests/compose/setup.sql index fa04b82c2c7af..f8e280cc45bd1 100644 --- a/sqlserver/tests/compose/setup.sql +++ b/sqlserver/tests/compose/setup.sql @@ -16,6 +16,7 @@ USE master; CREATE LOGIN bob WITH PASSWORD = 'Password12!'; CREATE USER bob FOR LOGIN bob; GRANT CONNECT ANY DATABASE to bob; +GRANT VIEW SERVER STATE TO bob; CREATE LOGIN fred WITH PASSWORD = 'Password12!'; CREATE USER fred FOR LOGIN fred; GRANT CONNECT ANY DATABASE to fred; @@ -338,6 +339,8 @@ GO GRANT EXECUTE on conditionalPlanTest to bob; GO +-- create Extended event (XE) sessions +-- session for deadlock detection CREATE EVENT SESSION datadog ON SERVER ADD EVENT sqlserver.xml_deadlock_report @@ -352,3 +355,109 @@ GO ALTER EVENT SESSION datadog ON SERVER STATE = START; GO +-- 1. Query completions (grouped) +-- Includes RPC completions, batch completions, and stored procedure completions +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_completions' +) + DROP EVENT SESSION datadog_query_completions ON SERVER; +GO + +CREATE EVENT SESSION datadog_query_completions ON SERVER +ADD EVENT sqlserver.rpc_completed ( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 -- in microseconds, 1 second + ) +), +ADD EVENT sqlserver.sql_batch_completed( + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +), +ADD EVENT sqlserver.module_end( + SET collect_statement = (1) + ACTION ( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE ( + sql_text <> '' AND + duration > 1000000 + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + TRACK_CAUSALITY = ON, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 3 SECONDS, + STARTUP_STATE = ON +); +GO + +-- 2. Errors and Attentions (grouped) +IF EXISTS ( + SELECT * FROM sys.server_event_sessions WHERE name = 'datadog_query_errors' +) + DROP EVENT SESSION datadog_query_errors ON SERVER; +GO +CREATE EVENT SESSION datadog_query_errors ON SERVER +-- Low-frequency events: send to ring_buffer +ADD EVENT sqlserver.error_reported( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) + WHERE severity >= 11 +), +ADD EVENT sqlserver.attention( + ACTION( + sqlserver.sql_text, + sqlserver.database_name, + sqlserver.username, + sqlserver.client_app_name, + sqlserver.client_hostname, + sqlserver.session_id, + sqlserver.request_id + ) +) +ADD TARGET package0.ring_buffer +WITH ( + MAX_MEMORY = 2048 KB, + EVENT_RETENTION_MODE = ALLOW_SINGLE_EVENT_LOSS, + MAX_DISPATCH_LATENCY = 30 SECONDS, + STARTUP_STATE = ON +); + +ALTER EVENT SESSION datadog_query_completions ON SERVER STATE = START; +ALTER EVENT SESSION datadog_query_errors ON SERVER STATE = START; \ No newline at end of file diff --git a/sqlserver/tests/test_integration.py b/sqlserver/tests/test_integration.py index 25983cb94da87..caa2170df09e0 100644 --- a/sqlserver/tests/test_integration.py +++ b/sqlserver/tests/test_integration.py @@ -936,3 +936,126 @@ def test_check_static_information_expire(aggregator, dd_run_check, init_config, assert sqlserver_check.static_info_cache is not None assert len(sqlserver_check.static_info_cache.keys()) == 6 assert sqlserver_check.resolved_hostname == 'stubbed.hostname' + + +@pytest.mark.integration +@pytest.mark.usefixtures('dd_environment') +def test_xe_collection_integration(aggregator, dd_run_check, bob_conn, instance_docker, caplog): + """Test that XE sessions collect and process events properly.""" + # Configure instance to enable XE collection + instance = copy(instance_docker) + instance['dbm'] = True + instance['xe_collection'] = { + 'query_completions': { + 'enabled': True, + 'collection_interval': 0.1, + }, + 'query_errors': { + 'enabled': True, + 'collection_interval': 0.1, + }, + } + # Ensure raw query collection is enabled + instance['collect_raw_query_statement'] = {"enabled": True, "cache_max_size": 100, "samples_per_hour_per_query": 10} + + check = SQLServer(CHECK_NAME, {}, [instance]) + + # Run check once to initialize sessions if needed + dd_run_check(check) + + # Execute a query that will be captured (long enough to exceed the threshold) + test_query = "WAITFOR DELAY '00:00:02'; SELECT 1;" + bob_conn.execute_with_retries(test_query) + + # Execute a query that will generate an error + error_query = "SELECT 1/0;" # Division by zero error + try: + bob_conn.execute_with_retries(error_query) + except: + pass # We expect this to fail + + # Run check again to collect the events + dd_run_check(check) + + # Get events from the platform events API + dbm_activity = aggregator.get_event_platform_events("dbm-activity") + + # Filter completion events (now each event may contain multiple query details) + query_completion_batches = [ + e + for e in dbm_activity + if e.get('dbm_type') == 'query_completion' and 'datadog_query_completions' in str(e.get('event_source', '')) + ] + + # Filter error events (now each event may contain multiple query details) + error_batches = [ + e + for e in dbm_activity + if e.get('dbm_type') == 'query_error' and 'datadog_query_errors' in str(e.get('event_source', '')) + ] + + # We should have at least one batch of completion events + assert len(query_completion_batches) > 0, "No query completion batches collected" + + # We should have at least one batch of error events + assert len(error_batches) > 0, "No error event batches collected" + + # Extract all individual completion events from batches + query_completion_events = [] + for batch in query_completion_batches: + events = batch.get('sqlserver_query_completions', []) + if events: + query_completion_events.extend(events) + + # Extract all individual error events from batches + error_events = [] + for batch in error_batches: + events = batch.get('sqlserver_query_errors', []) + if events: + error_events.extend(events) + + # We should have at least one query completion event + assert len(query_completion_events) > 0, "No query completion events collected" + + # We should have at least one error event + assert len(error_events) > 0, "No error events collected" + + # Verify specific query completion event details + found_test_query = False + for event in query_completion_events: + # Look at query_details field which contains the XE event info + query_details = event.get('query_details', {}) + sql_text = query_details.get('sql_text', '') + + if "WAITFOR DELAY" in sql_text and "SELECT 1" in sql_text: + found_test_query = True + # Check for expected properties + assert "bob" in query_details.get('username', ''), "Username 'bob' not found in event" + assert 'duration_ms' in query_details, "Duration not found in event" + # The duration should be at least 2000ms (2 seconds) + duration = float(query_details.get('duration_ms', 0)) + assert duration >= 2000, f"Expected duration >= 2000ms, but got {duration}ms" + # Verify raw_query_signature is present when collect_raw_query is enabled + assert 'raw_query_signature' in query_details, "raw_query_signature not found in query details" + assert query_details.get('raw_query_signature'), "raw_query_signature is empty" + + assert found_test_query, "Could not find our specific test query in the completion events" + + # Verify specific error event details + found_error_query = False + for event in error_events: + # Look at query_details field which contains the XE event info + query_details = event.get('query_details', {}) + sql_text = query_details.get('sql_text', '') + + if "SELECT 1/0" in sql_text: + found_error_query = True + # Check for expected properties + assert "bob" in query_details.get('username', ''), "Username 'bob' not found in error event" + assert "Divide by zero" in query_details.get('message', ''), "Expected error message not found" + assert query_details.get('error_number') == 8134, "Expected error number 8134 not found" + # Verify raw_query_signature is present when collect_raw_query is enabled + assert 'raw_query_signature' in query_details, "raw_query_signature not found in error query details" + assert query_details.get('raw_query_signature'), "raw_query_signature is empty" + + assert found_error_query, "Could not find our specific error query in the error events" diff --git a/sqlserver/tests/test_xe_collection.py b/sqlserver/tests/test_xe_collection.py new file mode 100644 index 0000000000000..651203f2fe0e2 --- /dev/null +++ b/sqlserver/tests/test_xe_collection.py @@ -0,0 +1,1100 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import logging +import os +import sys +from unittest.mock import Mock, patch + +import pytest +from lxml import etree + +from datadog_checks.sqlserver import SQLServer +from datadog_checks.sqlserver.xe_collection.base import TimestampHandler +from datadog_checks.sqlserver.xe_collection.error_events import ErrorEventsHandler +from datadog_checks.sqlserver.xe_collection.query_completion_events import QueryCompletionEventsHandler +from datadog_checks.sqlserver.xe_collection.xml_tools import ( + extract_duration, + extract_field, + extract_int_value, + extract_text_representation, + extract_value, +) + +CHECK_NAME = 'sqlserver' + +# Mock datadog_agent before imports - ensure it's properly patched at module level +datadog_agent_mock = Mock() +datadog_agent_mock.get_version.return_value = '7.30.0' +sys.modules['datadog_agent'] = datadog_agent_mock + + +# Helper functions +def load_xml_fixture(filename): + """Load an XML file from the fixtures directory""" + fixtures_dir = os.path.join(os.path.dirname(__file__), 'xml_xe_events') + with open(os.path.join(fixtures_dir, filename), 'r') as f: + return f.read() + + +def wrap_xml_in_events_tag(event_xml): + """Wrap a single event XML in the events tag for testing""" + return f"{event_xml}" + + +def assert_event_field_values(event, expected_values): + """Assert that event fields match expected values with appropriate type conversion""" + for field, expected in expected_values.items(): + if field in ['session_id', 'request_id', 'error_number', 'severity']: + assert int(event[field]) == expected + else: + assert event[field] == expected + + +def validate_common_payload_fields(payload, expected_source, expected_type): + """Validate common fields in event payloads""" + assert 'timestamp' in payload + assert payload['host'] == 'test-host' + assert payload['ddagentversion'] == '7.30.0' + assert payload['ddsource'] == 'sqlserver' + assert payload['dbm_type'] == expected_type + assert payload['event_source'] == expected_source + assert 'service' in payload + + # Fields that only exist in regular events (non-RQT) + if expected_type != 'rqt': + assert 'collection_interval' in payload + assert 'sqlserver_version' in payload + assert 'sqlserver_engine_edition' in payload + assert 'query_details' in payload + + # Fields that only exist in RQT events + if expected_type == 'rqt': + assert 'db' in payload + assert 'sqlserver' in payload + + +# Fixtures for common test objects +@pytest.fixture +def mock_check(): + """Create a mock check with necessary attributes""" + check = Mock() + check.log = Mock() + + # Setup connection context manager properly + conn_mock = Mock() + cursor_mock = Mock() + conn_context = Mock() + conn_context.__enter__ = Mock(return_value=conn_mock) + conn_context.__exit__ = Mock(return_value=None) + cursor_context = Mock() + cursor_context.__enter__ = Mock(return_value=cursor_mock) + cursor_context.__exit__ = Mock(return_value=None) + + check.connection = Mock() + check.connection.open_managed_default_connection = Mock(return_value=conn_context) + check.connection.get_managed_cursor = Mock(return_value=cursor_context) + + # Make debug_stats_kwargs return an empty dictionary for @tracked_method decorator + check.debug_stats_kwargs.return_value = {} + + check.static_info_cache = {'version': '2019', 'engine_edition': 'Standard Edition'} + check.resolved_hostname = "test-host" + check.tags = ["test:tag"] + check.database_monitoring_query_activity = Mock() + check.database_monitoring_query_sample = Mock() + return check + + +@pytest.fixture +def mock_config(): + """Create a mock configuration""" + config = Mock() + config.collect_raw_query_statement = {"enabled": True, "cache_max_size": 100, "samples_per_hour_per_query": 10} + config.min_collection_interval = 10 + config.obfuscator_options = {'dbms': 'mssql', 'obfuscation_mode': 'replace'} + config.xe_collection_config = { + 'query_completions': {'collection_interval': 10, 'enabled': True}, + 'query_errors': {'collection_interval': 20, 'enabled': True}, + } + config.cloud_metadata = {} + config.service = "sqlserver" + return config + + +# Fixtures for XML data +@pytest.fixture +def sample_sql_batch_event_xml(): + """Load a sample SQL batch completed event XML""" + return load_xml_fixture('sql_batch_completed.xml') + + +@pytest.fixture +def sample_rpc_completed_event_xml(): + """Load a sample RPC completed event XML""" + return load_xml_fixture('rpc_completed.xml') + + +@pytest.fixture +def sample_error_event_xml(): + """Load a sample error event XML""" + return load_xml_fixture('error_reported.xml') + + +@pytest.fixture +def sample_module_end_event_xml(): + """Load a sample module end event XML""" + return load_xml_fixture('module_end.xml') + + +@pytest.fixture +def sample_multiple_events_xml(): + """Load a sample with multiple events XML""" + return load_xml_fixture('multiple_events.xml') + + +@pytest.fixture +def sample_attention_event_xml(): + """Load a sample attention event XML""" + return load_xml_fixture('attention.xml') + + +# Fixtures for expected event values +@pytest.fixture +def sql_batch_expected_values(): + """Expected values for SQL batch completed events""" + return { + 'event_name': 'sql_batch_completed', + 'timestamp': '2025-04-24T20:56:52.809Z', + 'duration_ms': 4829.704, + 'session_id': 123, + 'request_id': 0, + 'database_name': 'master', + 'client_hostname': 'COMP-MX2YQD7P2P', + 'client_app_name': 'azdata', + 'username': 'datadog', + } + + +@pytest.fixture +def rpc_completed_expected_values(): + """Expected values for RPC completed events""" + return { + 'event_name': 'rpc_completed', + 'timestamp': '2025-04-24T20:57:04.937Z', + 'duration_ms': 2699.535, + 'session_id': 203, + 'request_id': 0, + 'database_name': 'msdb', + 'client_hostname': 'EC2AMAZ-ML3E0PH', + 'client_app_name': 'SQLAgent - Job Manager', + 'username': 'NT AUTHORITY\\NETWORK SERVICE', + } + + +@pytest.fixture +def error_expected_values(): + """Expected values for error reported events""" + return { + 'event_name': 'error_reported', + 'timestamp': '2025-04-24T20:57:17.287Z', + 'error_number': 195, + 'severity': 15, + 'session_id': 81, + 'request_id': 0, + 'database_name': 'dbmorders', + 'client_hostname': 'a05c90468fb8', + 'client_app_name': 'go-mssqldb', + 'username': 'shopper_4', + 'message': "'REPEAT' is not a recognized built-in function name.", + } + + +@pytest.fixture +def module_end_expected_values(): + """Expected values for module end events""" + return { + 'event_name': 'module_end', + 'timestamp': '2025-04-24T20:56:25.313Z', + 'duration_ms': 1239.182, # 1239182 / 1000 + 'session_id': 115, + 'request_id': 0, + 'database_name': 'dbmorders', + 'client_hostname': 'a05c90468fb8', + 'client_app_name': 'go-mssqldb', + 'username': 'shopper_4', + 'statement': 'EXEC SelectAndProcessOrderItem', + 'sql_text': "/*dddbs='orders-app',ddps='orders-app'," + + "ddh='awbergs-sqlserver2019-test.c7ug0vvtkhqv.us-east-1.rds.amazonaws.com'," + + "dddb='dbmorders',ddprs='orders-sqlserver'*/ EXEC SelectAndProcessOrderItem", + # Module-specific fields + 'object_name': 'SelectAndProcessOrderItem', + 'object_type': 'P', # P for stored procedure + 'row_count': 2, + 'line_number': 1, + 'offset': 314, + 'offset_end': 372, + 'source_database_id': 9, + 'object_id': 2002300576, + } + + +@pytest.fixture +def attention_expected_values(): + """Expected values for attention events""" + return { + 'event_name': 'attention', + 'timestamp': '2025-04-24T20:37:47.978Z', + 'duration_ms': 328.677, + 'session_id': 123, + 'request_id': 0, + 'database_name': 'master', + 'client_hostname': 'COMP-MX2YQD7P2P', + 'client_app_name': 'azdata', + 'username': 'datadog', + } + + +# Fixtures for handler instances +@pytest.fixture +def query_completion_handler(mock_check, mock_config): + """Create a QueryCompletionEventsHandler instance for testing""" + return QueryCompletionEventsHandler(mock_check, mock_config) + + +@pytest.fixture +def error_events_handler(mock_check, mock_config): + """Create an ErrorEventsHandler instance for testing""" + return ErrorEventsHandler(mock_check, mock_config) + + +@pytest.fixture +def mock_handler_log(request): + """Mock a handler's log for testing""" + + def _mock_log(handler, mock_check): + original_log = handler._log + handler._log = mock_check.log + + # Add finalizer to restore log after test + def _restore_log(): + handler._log = original_log + + request.addfinalizer(_restore_log) + + return mock_check.log + + return _mock_log + + +class TestTimestampHandler: + """Tests for the TimestampHandler utility class""" + + def test_format_for_output_valid_timestamps(self): + """Test timestamp formatting with valid inputs""" + # Test with UTC Z suffix + assert TimestampHandler.format_for_output("2023-01-01T12:00:00.123Z") == "2023-01-01T12:00:00.123Z" + + # Test with timezone offset + assert TimestampHandler.format_for_output("2023-01-01T12:00:00.123+00:00") == "2023-01-01T12:00:00.123Z" + + # Test with more microsecond precision + assert TimestampHandler.format_for_output("2023-01-01T12:00:00.123456Z") == "2023-01-01T12:00:00.123Z" + + def test_format_for_output_edge_cases(self): + """Test timestamp formatting with edge cases""" + # Test with empty input + assert TimestampHandler.format_for_output("") == "" + + # Test with None input + assert TimestampHandler.format_for_output(None) == "" + + # Test with invalid format + assert TimestampHandler.format_for_output("invalid-date") == "invalid-date" + + def test_calculate_start_time_valid_inputs(self): + """Test calculation of start time from end time and duration""" + # Test with 1 second duration + assert TimestampHandler.calculate_start_time("2023-01-01T12:00:01.000Z", 1000) == "2023-01-01T12:00:00.000Z" + + # Test with fractional milliseconds + assert TimestampHandler.calculate_start_time("2023-01-01T12:00:00.500Z", 500) == "2023-01-01T12:00:00.000Z" + + # Test with timezone offset + assert ( + TimestampHandler.calculate_start_time("2023-01-01T12:00:00.000+00:00", 1000) == "2023-01-01T11:59:59.000Z" + ) + + def test_calculate_start_time_edge_cases(self): + """Test start time calculation with edge cases""" + # Test with empty timestamp + assert TimestampHandler.calculate_start_time("", 1000) == "" + + # Test with None timestamp + assert TimestampHandler.calculate_start_time(None, 1000) == "" + + # Test with None duration + assert TimestampHandler.calculate_start_time("2023-01-01T12:00:00.000Z", None) == "" + + # Test with zero duration + assert TimestampHandler.calculate_start_time("2023-01-01T12:00:00.000Z", 0) == "2023-01-01T12:00:00.000Z" + + # Test with invalid timestamp + assert TimestampHandler.calculate_start_time("invalid-date", 1000) == "" + + +class TestXESessionHandlersInitialization: + """Tests related to handler initialization""" + + def test_initialization(self, mock_check, mock_config): + """Test initialization of handlers""" + # Test QueryCompletionEventsHandler + handler = QueryCompletionEventsHandler(mock_check, mock_config) + assert handler.session_name == "datadog_query_completions" + assert handler.collection_interval == 10 + assert handler._enabled is True + + # Test ErrorEventsHandler + handler = ErrorEventsHandler(mock_check, mock_config) + assert handler.session_name == "datadog_query_errors" + assert handler.collection_interval == 20 + assert handler._enabled is True + + def test_session_exists(self, query_completion_handler, mock_check): + """Test session existence checking""" + # Set up cursor mock + cursor = mock_check.connection.get_managed_cursor.return_value.__enter__.return_value + + # Test when session exists + cursor.fetchone.return_value = [1] # Session exists + assert query_completion_handler.session_exists() is True + + # Test when session does not exist + cursor.fetchone.return_value = None # No session + assert query_completion_handler.session_exists() is False + + def test_check_azure_status(self, mock_check, mock_config): + """Test Azure SQL Database detection""" + # Test non-Azure SQL Server + mock_check.static_info_cache = {'engine_edition': 'Standard Edition'} + handler = QueryCompletionEventsHandler(mock_check, mock_config) + assert handler._is_azure_sql_database is False + + # Test Azure SQL Database + mock_check.static_info_cache = {'engine_edition': 'Azure SQL Database'} + + with patch( + 'datadog_checks.sqlserver.xe_collection.base.is_azure_sql_database', + side_effect=lambda x: x == 'Azure SQL Database', + ): + handler = QueryCompletionEventsHandler(mock_check, mock_config) + assert handler._is_azure_sql_database is True + + +class TestXESessionHelpers: + """Tests for XML parsing tools""" + + def test_extract_value(self): + """Test extraction of values from XML elements""" + # Test extracting value from element with value element + xml = 'test_value' + element = etree.fromstring(xml) + assert extract_value(element) == 'test_value' + + # Test extracting value from element with text + xml = 'test_value' + element = etree.fromstring(xml) + assert extract_value(element) == 'test_value' + + # Test empty element + xml = '' + element = etree.fromstring(xml) + assert extract_value(element) is None + assert extract_value(element, 'default') == 'default' + + # Test None element + assert extract_value(None) is None + assert extract_value(None, 'default') == 'default' + + def test_extract_int_value(self): + """Test extraction of integer values""" + # Test valid integer + xml = '123' + element = etree.fromstring(xml) + assert extract_int_value(element) == 123 + + # Test invalid integer + xml = 'not_a_number' + element = etree.fromstring(xml) + assert extract_int_value(element) is None + assert extract_int_value(element, 0) == 0 + + # Test empty element + xml = '' + element = etree.fromstring(xml) + assert extract_int_value(element) is None + assert extract_int_value(element, 0) == 0 + + def test_extract_text_representation(self): + """Test extraction of text representation""" + # Test with text element + xml = '123text_value' + element = etree.fromstring(xml) + assert extract_text_representation(element) == 'text_value' + + # Test without text element + xml = '123' + element = etree.fromstring(xml) + assert extract_text_representation(element) is None + assert extract_text_representation(element, 'default') == 'default' + + def test_extract_duration(self): + """Test duration extraction specifically""" + # Test with valid duration + xml = '4829704' + element = etree.fromstring(xml) + + # Test direct function + event_data = {} + extract_duration(element, event_data) + assert event_data["duration_ms"] == 4829.704 + + # Test with invalid duration + xml = 'not_a_number' + element = etree.fromstring(xml) + + # Test direct function + event_data = {} + extract_duration(element, event_data) + assert event_data["duration_ms"] is None + + def test_extract_field(self, query_completion_handler): + """Test field extraction based on its type""" + # Get TEXT_FIELDS and numeric_fields for testing + text_fields = query_completion_handler.TEXT_FIELDS + numeric_fields = query_completion_handler.get_numeric_fields('test_event') + + # For duration field + xml = '4829704' + element = etree.fromstring(xml) + + # Test direct function + event_data = {'event_name': 'test_event'} + extract_field(element, event_data, 'duration', numeric_fields, text_fields) + assert event_data["duration_ms"] == 4829.704 + + # For numeric field + xml = '123' + element = etree.fromstring(xml) + + # Test direct function + event_data = {'event_name': 'test_event'} + extract_field(element, event_data, 'session_id', numeric_fields, text_fields) + assert event_data["session_id"] == 123 + + # For text field (create a test logger) + log = logging.getLogger('test') + + # Define a test text field + test_text_fields = ['result'] + + # For text field + xml = '123Success' + element = etree.fromstring(xml) + + # Test direct function + event_data = {'event_name': 'test_event'} + extract_field(element, event_data, 'result', numeric_fields, test_text_fields, log) + assert event_data["result"] == 'Success' + + # For regular field + xml = 'TestDB' + element = etree.fromstring(xml) + + # Test direct function + event_data = {'event_name': 'test_event'} + extract_field(element, event_data, 'database_name', numeric_fields, text_fields, log) + assert event_data["database_name"] == 'TestDB' + + def test_determine_dbm_type(self, mock_check, mock_config): + """Test determination of DBM type based on session name""" + # Test query completion handler + handler = QueryCompletionEventsHandler(mock_check, mock_config) + assert handler._determine_dbm_type() == "query_completion" + + # Test query error handler + handler = ErrorEventsHandler(mock_check, mock_config) + assert handler._determine_dbm_type() == "query_error" + + def test_process_events_filtering(self, query_completion_handler): + """Test filtering and processing of ring buffer events based on timestamp""" + # Create XML with multiple events + xml_data = """ + + + 10000 + + + 5000 + + + 2000 + + + """ + + # Mock event handler to always return True + mock_handler = Mock(return_value=True) + query_completion_handler._event_handlers = {'sql_batch_completed': mock_handler} + + # Test with no timestamp filter (first run) + processed_events = query_completion_handler._process_events(xml_data) + assert len(processed_events) == 3 + assert mock_handler.call_count == 3 + + # Reset mock and set last event timestamp + mock_handler.reset_mock() + query_completion_handler._last_event_timestamp = "2023-01-01T12:01:00.456Z" + + # Test with timestamp filter (subsequent run) + processed_events = query_completion_handler._process_events(xml_data) + assert len(processed_events) == 1 # Only the event after 12:01:00.456Z + assert processed_events[0]['timestamp'] == "2023-01-01T12:02:00.789Z" + assert mock_handler.call_count == 1 + + def test_malformed_xml(self, query_completion_handler): + """Test handling of malformed XML""" + # Malformed XML data + xml_data = "Malformed XML" + + # Should return empty list and not raise exception + events = query_completion_handler._process_events(xml_data) + assert events == [] + + +class TestEventProcessing: + """Tests for event processing""" + + def test_process_events_sql_batch( + self, query_completion_handler, sample_sql_batch_event_xml, sql_batch_expected_values + ): + """Test processing of SQL batch completed events""" + # Wrap the single event in an events tag + xml_data = wrap_xml_in_events_tag(sample_sql_batch_event_xml) + + # Process the events + events = query_completion_handler._process_events(xml_data) + + # Verify the event was processed correctly + assert len(events) == 1 + event = events[0] + + # Verify expected values + assert_event_field_values(event, sql_batch_expected_values) + + # Check for event-specific fields + assert 'batch_text' in event + assert 'datadog_sp_statement_completed' in event['batch_text'] + assert 'sql_text' in event + assert 'datadog_sp_statement_completed' in event['sql_text'] + + def test_process_events_rpc_completed( + self, query_completion_handler, sample_rpc_completed_event_xml, rpc_completed_expected_values + ): + """Test processing of RPC completed events""" + # Wrap the single event in an events tag + xml_data = wrap_xml_in_events_tag(sample_rpc_completed_event_xml) + + # Process the events + events = query_completion_handler._process_events(xml_data) + + # Verify the event was processed correctly + assert len(events) == 1 + event = events[0] + + # Verify expected values + assert_event_field_values(event, rpc_completed_expected_values) + + # Check for event-specific fields + assert 'statement' in event + assert 'sp_executesql' in event['statement'] + assert 'sql_text' in event + assert 'EXECUTE [msdb].[dbo].[sp_agent_log_job_history]' in event['sql_text'] + + def test_process_events_error_reported(self, error_events_handler, sample_error_event_xml, error_expected_values): + """Test processing of error reported events""" + # Wrap the single event in an events tag + xml_data = wrap_xml_in_events_tag(sample_error_event_xml) + + # Process the events + events = error_events_handler._process_events(xml_data) + + # Verify the event was processed correctly + assert len(events) == 1 + event = events[0] + + # Verify expected values + assert_event_field_values(event, error_expected_values) + + # Check for event-specific fields + assert 'sql_text' in event + assert 'SELECT discount_percent' in event['sql_text'] + assert "REPEAT('a', 1000)" in event['sql_text'] + + def test_process_events_module_end( + self, query_completion_handler, sample_module_end_event_xml, module_end_expected_values + ): + """Test processing of module end events""" + # Wrap the single event in an events tag + xml_data = wrap_xml_in_events_tag(sample_module_end_event_xml) + + # Process the events + events = query_completion_handler._process_events(xml_data) + + # Verify the event was processed correctly + assert len(events) == 1 + event = events[0] + + # Verify expected values + assert_event_field_values(event, module_end_expected_values) + + # Check for event-specific fields + assert 'statement' in event + assert 'EXEC SelectAndProcessOrderItem' in event['statement'] + assert 'sql_text' in event + assert 'EXEC SelectAndProcessOrderItem' in event['sql_text'] + assert 'object_name' in event + assert event['object_name'] == 'SelectAndProcessOrderItem' + assert 'object_type' in event + assert event['object_type'] == 'P' # P for stored procedure + assert 'row_count' in event + assert int(event['row_count']) == 2 + + def test_process_events_multiple(self, query_completion_handler, error_events_handler, sample_multiple_events_xml): + """Test processing of multiple events""" + # Process with both handlers + events = [] + events.extend(query_completion_handler._process_events(sample_multiple_events_xml)) + events.extend(error_events_handler._process_events(sample_multiple_events_xml)) + + # Sort and validate event count + events.sort(key=lambda x: x['timestamp']) + assert len(events) == 3 + + # Validate expected event types in order + expected_types = ['sql_batch_completed', 'rpc_completed', 'error_reported'] + expected_sessions = [123, 124, 125] + + for event, exp_type, exp_session in zip(events, expected_types, expected_sessions): + assert event['event_name'] == exp_type + assert int(event['session_id']) == exp_session + + def test_process_events_attention( + self, error_events_handler, sample_attention_event_xml, attention_expected_values + ): + """Test processing of attention events""" + # Wrap the single event in an events tag + xml_data = wrap_xml_in_events_tag(sample_attention_event_xml) + + # Process the events + events = error_events_handler._process_events(xml_data) + + # Verify the event was processed correctly + assert len(events) == 1 + event = events[0] + + # Verify expected values + assert_event_field_values(event, attention_expected_values) + + # Check for event-specific fields + assert 'sql_text' in event + assert 'DECLARE @session_name NVARCHAR(100) = \'datadog_sql_statement\'' in event['sql_text'] + + +class TestPayloadGeneration: + """Tests for event payload generation""" + + @patch('datadog_checks.sqlserver.xe_collection.base.obfuscate_sql_with_metadata') + @patch('datadog_checks.sqlserver.xe_collection.base.compute_sql_signature') + def test_obfuscate_sql_fields(self, mock_compute_signature, mock_obfuscate, query_completion_handler): + """Test SQL field obfuscation and signature creation""" + # Setup mock obfuscator and signature generator + mock_obfuscate.return_value = { + 'query': 'SELECT * FROM Customers WHERE CustomerId = ?', + 'metadata': {'commands': ['SELECT'], 'tables': ['Customers'], 'comments': []}, + } + mock_compute_signature.return_value = 'abc123' + + # Test event with SQL fields + event = { + 'event_name': 'sql_batch_completed', + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'sql_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + } + + obfuscated_event, raw_sql_fields = query_completion_handler._obfuscate_sql_fields(event) + + # Verify obfuscated fields + assert obfuscated_event['batch_text'] == 'SELECT * FROM Customers WHERE CustomerId = ?' + assert obfuscated_event['sql_text'] == 'SELECT * FROM Customers WHERE CustomerId = ?' + assert obfuscated_event['dd_commands'] == ['SELECT'] + assert obfuscated_event['dd_tables'] == ['Customers'] + assert obfuscated_event['query_signature'] == 'abc123' + + # Verify raw SQL fields + assert raw_sql_fields['batch_text'] == 'SELECT * FROM Customers WHERE CustomerId = 123' + assert raw_sql_fields['sql_text'] == 'SELECT * FROM Customers WHERE CustomerId = 123' + assert raw_sql_fields['raw_query_signature'] == 'abc123' + + # Verify raw_query_signature is added to the obfuscated event when collect_raw_query is enabled + assert 'raw_query_signature' in obfuscated_event + assert obfuscated_event['raw_query_signature'] == 'abc123' + + def test_normalize_event(self, query_completion_handler): + """Test event normalization""" + # Test event with all fields + event = { + 'event_name': 'sql_batch_completed', + 'timestamp': '2023-01-01T12:00:00.123Z', + 'duration_ms': 10.0, # Already in milliseconds + 'session_id': 123, + 'request_id': 456, + 'database_name': 'TestDB', + 'client_hostname': 'TESTCLIENT', + 'client_app_name': 'TestApp', + 'username': 'TestUser', + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'sql_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'query_signature': 'abc123', + 'raw_query_signature': 'def456', + } + + normalized = query_completion_handler._normalize_event_impl(event) + + # Verify normalized fields + assert normalized['xe_type'] == 'sql_batch_completed' + assert normalized['event_fire_timestamp'] == '2023-01-01T12:00:00.123Z' + assert normalized['duration_ms'] == 10.0 + assert normalized['session_id'] == 123 + assert normalized['request_id'] == 456 + assert normalized['database_name'] == 'TestDB' + assert normalized['client_hostname'] == 'TESTCLIENT' + assert normalized['client_app_name'] == 'TestApp' + assert normalized['username'] == 'TestUser' + assert normalized['batch_text'] == 'SELECT * FROM Customers WHERE CustomerId = 123' + assert normalized['sql_text'] == 'SELECT * FROM Customers WHERE CustomerId = 123' + assert normalized['query_signature'] == 'abc123' + assert normalized['raw_query_signature'] == 'def456' + + def test_normalize_error_event(self, error_events_handler): + """Test error event normalization""" + # Test error event with fields + event = { + 'event_name': 'error_reported', + 'timestamp': '2023-01-01T12:00:00.123Z', + 'error_number': 8134, + 'severity': 15, + 'state': 1, + 'session_id': 123, + 'request_id': 456, + 'database_name': 'TestDB', + 'message': 'Division by zero error', + 'sql_text': 'SELECT 1/0', + } + + normalized = error_events_handler._normalize_event_impl(event) + + # Verify normalized fields + assert normalized['xe_type'] == 'error_reported' + assert normalized['event_fire_timestamp'] == '2023-01-01T12:00:00.123Z' + assert normalized['error_number'] == 8134 + assert normalized['severity'] == 15 + assert normalized['state'] == 1 + assert normalized['session_id'] == 123 + assert normalized['request_id'] == 456 + assert normalized['database_name'] == 'TestDB' + assert normalized['message'] == 'Division by zero error' + assert normalized['sql_text'] == 'SELECT 1/0' + + # Verify duration_ms and query_start are removed for error events + assert 'duration_ms' not in normalized + assert 'query_start' not in normalized + + @patch('datadog_checks.sqlserver.xe_collection.base.datadog_agent') + def test_create_event_payload(self, mock_agent, query_completion_handler): + """Test creation of event payload""" + mock_agent.get_version.return_value = '7.30.0' + + # Create a raw event + raw_event = { + 'event_name': 'sql_batch_completed', + 'timestamp': '2023-01-01T12:00:00.123Z', + 'duration_ms': 10.0, + 'session_id': 123, + 'request_id': 456, + 'database_name': 'TestDB', + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'query_signature': 'abc123', + } + + # Create payload + payload = query_completion_handler._create_event_payload(raw_event) + + # Validate common payload fields + validate_common_payload_fields( + payload, expected_source='datadog_query_completions', expected_type='query_completion' + ) + + # Verify query details + query_details = payload['query_details'] + assert query_details['xe_type'] == 'sql_batch_completed' + assert query_details['duration_ms'] == 10.0 + assert query_details['session_id'] == 123 + assert query_details['request_id'] == 456 + assert query_details['database_name'] == 'TestDB' + assert query_details['query_signature'] == 'abc123' + + @patch('datadog_checks.sqlserver.xe_collection.base.datadog_agent') + def test_create_rqt_event(self, mock_agent, query_completion_handler): + """Test creation of Raw Query Text event""" + mock_agent.get_version.return_value = '7.30.0' + + # Create event with SQL fields + event = { + 'event_name': 'sql_batch_completed', + 'timestamp': '2023-01-01T12:00:00.123Z', + 'duration_ms': 10.0, + 'session_id': 123, + 'database_name': 'TestDB', + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = ?', + 'query_signature': 'abc123', + } + + # Create raw SQL fields + raw_sql_fields = { + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'raw_query_signature': 'def456', + } + + # Query details with formatted timestamps + query_details = {'event_fire_timestamp': '2023-01-01T12:00:00.123Z', 'query_start': '2023-01-01T11:59:50.123Z'} + + # Create RQT event + rqt_event = query_completion_handler._create_rqt_event(event, raw_sql_fields, query_details) + + # Validate common payload fields + validate_common_payload_fields(rqt_event, expected_source='datadog_query_completions', expected_type='rqt') + + # Verify DB fields + assert rqt_event['db']['instance'] == 'TestDB' + assert rqt_event['db']['query_signature'] == 'abc123' + assert rqt_event['db']['raw_query_signature'] == 'def456' + assert rqt_event['db']['statement'] == 'SELECT * FROM Customers WHERE CustomerId = 123' + + # Verify sqlserver fields + assert rqt_event['sqlserver']['session_id'] == 123 + assert rqt_event['sqlserver']['xe_type'] == 'sql_batch_completed' + assert rqt_event['sqlserver']['event_fire_timestamp'] == '2023-01-01T12:00:00.123Z' + assert rqt_event['sqlserver']['duration_ms'] == 10.0 + assert rqt_event['sqlserver']['query_start'] == '2023-01-01T11:59:50.123Z' + + def test_create_rqt_event_disabled(self, mock_check, mock_config): + """Test RQT event creation when disabled""" + # Disable raw query collection + mock_config.collect_raw_query_statement["enabled"] = False + + handler = QueryCompletionEventsHandler(mock_check, mock_config) + + event = { + 'event_name': 'sql_batch_completed', + 'timestamp': '2023-01-01T12:00:00.123Z', + 'query_signature': 'abc123', # Add query_signature to avoid assertion failure + } + + raw_sql_fields = { + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'raw_query_signature': 'def456', + } + + query_details = { + 'event_fire_timestamp': '2023-01-01T12:00:00.123Z', + } + + # Should return None when disabled + assert handler._create_rqt_event(event, raw_sql_fields, query_details) is None + + def test_create_rqt_event_missing_signature(self, query_completion_handler): + """Test RQT event creation with missing query signature""" + # Event without query signature + event = { + 'event_name': 'sql_batch_completed', + 'timestamp': '2023-01-01T12:00:00.123Z', + # No query_signature + } + + raw_sql_fields = { + 'batch_text': 'SELECT * FROM Customers WHERE CustomerId = 123', + 'raw_query_signature': 'def456', + } + + query_details = { + 'event_fire_timestamp': '2023-01-01T12:00:00.123Z', + } + + # Should return None when missing signature + assert query_completion_handler._create_rqt_event(event, raw_sql_fields, query_details) is None + + +@pytest.mark.integration +@pytest.mark.usefixtures('dd_environment') +def test_xe_session_handlers_creation(init_config, instance_docker_metrics): + """Test creation of XE session handlers via the SQLServer class""" + # Enable XE collection + instance = instance_docker_metrics.copy() + instance['xe_collection_config'] = {'query_completions': {'enabled': True}, 'query_errors': {'enabled': True}} + + # Create SQLServer check + sqlserver_check = SQLServer(CHECK_NAME, init_config, [instance]) + + # Instantiate the handlers directly to test + handlers = [] + handlers.append(QueryCompletionEventsHandler(sqlserver_check, sqlserver_check._config)) + handlers.append(ErrorEventsHandler(sqlserver_check, sqlserver_check._config)) + + # Verify that handlers were created with expected properties + assert len(handlers) == 2 + assert any(h.session_name == 'datadog_query_completions' for h in handlers) + assert any(h.session_name == 'datadog_query_errors' for h in handlers) + + +class TestRunJob: + """Group run job tests together""" + + def test_last_event_timestamp_updates_correctly(self, query_completion_handler, sample_multiple_events_xml): + """Test that the handler correctly updates its last event timestamp after processing events""" + # Create modified XML with specific timestamp + modified_xml = sample_multiple_events_xml.replace("2023-01-01T12:01:00.456Z", "2023-01-01T12:02:00.789Z") + + with patch.object(query_completion_handler, 'session_exists', return_value=True), patch.object( + query_completion_handler, '_query_ring_buffer', return_value=modified_xml + ): + + # Process events directly to set timestamp + events = query_completion_handler._process_events(modified_xml) + if events: + query_completion_handler._last_event_timestamp = events[-1]['timestamp'] + + # Verify the timestamp was updated + assert query_completion_handler._last_event_timestamp == "2023-01-01T12:02:00.789Z" + + def test_run_job_success_path(self, query_completion_handler, sample_multiple_events_xml): + """Test the complete happy path of run_job - session exists, events are queried, processed and submitted""" + + # Create a function to capture the payload before serialization + original_payload = None + + def capture_payload(payload, **kwargs): + nonlocal original_payload + original_payload = payload + # Return a simple string to avoid serialization issues + return '{}' + + # Mock all necessary methods + with patch.object(query_completion_handler, 'session_exists', return_value=True), patch.object( + query_completion_handler, '_query_ring_buffer', return_value=sample_multiple_events_xml + ), patch.object(query_completion_handler._check, 'database_monitoring_query_activity') as mock_submit, patch( + 'datadog_checks.sqlserver.xe_collection.base.json.dumps', side_effect=capture_payload + ): + # Run the job + query_completion_handler.run_job() + + # Verify exactly one batched event was submitted + assert mock_submit.call_count == 1, "Expected one batched event submission" + + # Now validate the actual payload structure that was going to be serialized + assert original_payload is not None, "Payload was not captured" + + # Check essential payload properties + assert 'ddsource' in original_payload, "Missing 'ddsource' in payload" + assert original_payload['ddsource'] == 'sqlserver', "Incorrect ddsource value" + assert 'dbm_type' in original_payload, "Missing 'dbm_type' in payload" + assert 'timestamp' in original_payload, "Missing 'timestamp' in payload" + + # Check for the new batched array based on session type + if query_completion_handler.session_name == "datadog_query_errors": + batch_key = "sqlserver_query_errors" + else: + batch_key = "sqlserver_query_completions" + + assert batch_key in original_payload, f"Missing '{batch_key}' array in payload" + assert isinstance(original_payload[batch_key], list), f"'{batch_key}' should be a list" + assert len(original_payload[batch_key]) > 0, f"'{batch_key}' list should not be empty" + + # Verify structure of query details objects in the array + for event in original_payload[batch_key]: + assert "query_details" in event, "Missing 'query_details' in event" + query_details = event["query_details"] + assert "xe_type" in query_details, "Missing 'xe_type' in query_details" + + def test_no_session(self, query_completion_handler, mock_check, mock_handler_log): + """Test behavior when session doesn't exist""" + with patch.object(query_completion_handler, 'session_exists', return_value=False): + # Mock the log using the fixture + log = mock_handler_log(query_completion_handler, mock_check) + + # Run the job + query_completion_handler.run_job() + + # Verify warning was logged + log.warning.assert_called_once_with( + f"XE session {query_completion_handler.session_name} not found or not running." + ) + + def test_event_batching(self, query_completion_handler, sample_multiple_events_xml): + """Test that multiple events get properly batched into a single payload""" + + # Create a function to capture the payload before serialization + original_payload = None + + def capture_payload(payload, **kwargs): + nonlocal original_payload + original_payload = payload + # Return a simple string to avoid serialization issues + return '{}' + + # Create a spy on the _create_event_payload method to capture what would be created + # for each individual event before batching + with patch.object( + query_completion_handler, '_create_event_payload', wraps=query_completion_handler._create_event_payload + ) as mock_create_payload, patch.object( + query_completion_handler, 'session_exists', return_value=True + ), patch.object( + query_completion_handler, '_query_ring_buffer', return_value=sample_multiple_events_xml + ), patch.object( + query_completion_handler._check, 'database_monitoring_query_activity' + ) as mock_submit, patch( + 'datadog_checks.sqlserver.xe_collection.base.json.dumps', side_effect=capture_payload + ): + # Run the job + query_completion_handler.run_job() + + # Verify create_event_payload was called multiple times (once per event) + assert mock_create_payload.call_count > 1, "Expected multiple events to be processed" + + # Verify database_monitoring_query_activity was only called once (batched) + assert mock_submit.call_count == 1, "Expected only one batched submission" + + # Validate the actual batched payload + assert original_payload is not None, "Payload was not captured" + + # Determine the appropriate batch key based on the session type + batch_key = ( + "sqlserver_query_errors" + if query_completion_handler.session_name == "datadog_query_errors" + else "sqlserver_query_completions" + ) + + # Verify the batch exists and contains multiple events + assert batch_key in original_payload, f"Missing '{batch_key}' array in payload" + assert len(original_payload[batch_key]) > 1, "Expected multiple events in the batch" diff --git a/sqlserver/tests/xml_xe_events/attention.xml b/sqlserver/tests/xml_xe_events/attention.xml new file mode 100644 index 0000000000000..400cd0ff9daf5 --- /dev/null +++ b/sqlserver/tests/xml_xe_events/attention.xml @@ -0,0 +1,93 @@ + + + + + 328677 + + + + + + 0 + + + + + + 0 + + + + + + 123 + + + + + + COMP-MX2YQD7P2P + + + + + + datadog + + + + + + master + + + + + + azdata + + + + + + -- Set the session name here + DECLARE @session_name NVARCHAR(100) = 'datadog_sql_statement'; + + -- See size of ring buffer + SELECT + DATALENGTH(target_data) / 1024.0 AS ring_buffer_kb + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer'; + + -- -- Minimal polling of session events + -- SELECT + -- event_data.query('.') AS full_event_xml + -- FROM ( + -- SELECT CAST(t.target_data AS XML) AS target_xml + -- FROM sys.dm_xe_session_targets AS t + -- JOIN sys.dm_xe_sessions AS s + -- ON t.event_session_address = s.address + -- WHERE s.name = @session_name + -- AND t.target_name = 'ring_buffer' + -- ) AS src + -- CROSS APPLY target_xml.nodes('//RingBufferTarget/event[position() <= 100]') AS XTbl(event_data); + + SELECT + event_data.value('(event/@timestamp)[1]', 'datetime2') AS event_timestamp, + event_data.query('.') AS full_event_xml + FROM ( + SELECT CAST(t.target_data AS XML) AS target_xml + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer' + ) AS src + CROSS APPLY target_xml.nodes('//RingBufferTarget/event[@name="attention"]') AS XTbl(event_data) + ORDER BY event_timestamp; + + + diff --git a/sqlserver/tests/xml_xe_events/error_reported.xml b/sqlserver/tests/xml_xe_events/error_reported.xml new file mode 100644 index 0000000000000..884bed388b8bd --- /dev/null +++ b/sqlserver/tests/xml_xe_events/error_reported.xml @@ -0,0 +1,98 @@ + + + + + 195 + + + + + + 15 + + + + + + 10 + + + + + + false + + + + + + 2 + + + SERVER + + + + + + 0x00000002 + + + USER + + + + + + false + + + + + + 'REPEAT' is not a recognized built-in function name. + + + + + + 0 + + + + + + 81 + + + + + + a05c90468fb8 + + + + + + go-mssqldb + + + + + + shopper_4 + + + + + + dbmorders + + + + + + /*dddbs='orders-app',ddps='orders-app',ddh='awbergs-sqlserver2019-test.c7ug0vvtkhqv.us-east-1.rds.amazonaws.com',dddb='dbmorders',ddprs='orders-sqlserver'*/ SELECT discount_percent, store_name, description, discount_in_currency, dbm_item_id, REPEAT('a', 1000) from discount where id BETWEEN 6117 AND 6127 GROUP by dbm_item_id, store_name, description, discount_in_currency, discount_percent /* date='12%2F31',key='val' */ + + + diff --git a/sqlserver/tests/xml_xe_events/module_end.xml b/sqlserver/tests/xml_xe_events/module_end.xml new file mode 100644 index 0000000000000..38acf6aaa37e5 --- /dev/null +++ b/sqlserver/tests/xml_xe_events/module_end.xml @@ -0,0 +1,110 @@ + + + + + 9 + + + + + + 2002300576 + + + + + + 1239182 + + + + + + 2 + + + + + + 1 + + + + + + 314 + + + + + + 372 + + + + + + P + + + + + + SelectAndProcessOrderItem + + + + + + EXEC SelectAndProcessOrderItem + + + + + + 0 + + + + + + 115 + + + + + + a05c90468fb8 + + + + + + go-mssqldb + + + + + + shopper_4 + + + + + + dbmorders + + + + + + /*dddbs='orders-app',ddps='orders-app',ddh='awbergs-sqlserver2019-test.c7ug0vvtkhqv.us-east-1.rds.amazonaws.com',dddb='dbmorders',ddprs='orders-sqlserver'*/ EXEC SelectAndProcessOrderItem + + + + + + 3C7BB946-DFAE-4953-B678-D440F97A3495-8 + + + diff --git a/sqlserver/tests/xml_xe_events/multiple_events.xml b/sqlserver/tests/xml_xe_events/multiple_events.xml new file mode 100644 index 0000000000000..38dfbb18a0072 --- /dev/null +++ b/sqlserver/tests/xml_xe_events/multiple_events.xml @@ -0,0 +1,47 @@ + + + + 10000 + + + 123 + + + 456 + + + TestDB + + + SELECT * FROM Customers WHERE CustomerId = 123 + + + + + 5000 + + + 124 + + + TestDB + + + EXEC sp_GetCustomerDetails @CustomerId = 123 + + + + + 8134 + + + 125 + + + Divide by zero error encountered. + + + SELECT 1/0 + + + \ No newline at end of file diff --git a/sqlserver/tests/xml_xe_events/rpc_completed.xml b/sqlserver/tests/xml_xe_events/rpc_completed.xml new file mode 100644 index 0000000000000..fa6632094a220 --- /dev/null +++ b/sqlserver/tests/xml_xe_events/rpc_completed.xml @@ -0,0 +1,130 @@ + + + + + 16000 + + + + + + 2699535 + + + + + + 0 + + + + + + 0 + + + + + + 75 + + + + + + 0 + + + + + + 0 + + + OK + + + + + + 24 + + + + + + 0 + + + None + + + + + + sp_executesql + + + + + + exec sp_executesql N'EXECUTE [msdb].[dbo].[sp_agent_log_job_history] @job_id = @P1, @is_system = @P2, @step_id = @P3, @sql_message_id = @P4, @sql_severity = @P5, @run_status = @P6, @run_date = @P7, @run_time = @P8, @run_duration = @P9, @operator_id_emailed = @P10, @operator_id_netsent = @P11, @operator_id_paged = @P12, @retries_attempted = @P13, @session_id = @P14, @message = @P15',N'@P1 uniqueidentifier,@P2 int,@P3 int,@P4 int,@P5 int,@P6 int,@P7 int,@P8 int,@P9 int,@P10 int,@P11 int,@P12 int,@P13 int,@P14 int,@P15 nvarchar(4000)','B3A023D3-F7F8-4D17-8524-59471E098205',0,0,0,0,0,20250424,205701,1,0,0,0,0,3,N'The job failed. The Job was invoked by Schedule 9 (Failing Job Schedule). The last step to run was step 1 (Set database to read only).' + + + + + + + + + + + + + + 0 + + + + + + 203 + + + + + + EC2AMAZ-ML3E0PH + + + + + + SQLAgent - Job Manager + + + + + + NT AUTHORITY\NETWORK SERVICE + + + + + + msdb + + + + + + (@P1 uniqueidentifier,@P2 int,@P3 int,@P4 int,@P5 int,@P6 int,@P7 int,@P8 int,@P9 int,@P10 int,@P11 int,@P12 int,@P13 int,@P14 int,@P15 nvarchar(4000))EXECUTE [msdb].[dbo].[sp_agent_log_job_history] @job_id = @P1, @is_system = @P2, @step_id = @P3, @sql_message_id = @P4, @sql_severity = @P5, @run_status = @P6, @run_date = @P7, @run_time = @P8, @run_duration = @P9, @operator_id_emailed = @P10, @operator_id_netsent = @P11, @operator_id_paged = @P12, @retries_attempted = @P13, @session_id = @P14, @message = @P15 + + + + + + C98F767E-39CD-4F0A-A4A2-5C7B28D2BE90-81 + + + diff --git a/sqlserver/tests/xml_xe_events/sql_batch_completed.xml b/sqlserver/tests/xml_xe_events/sql_batch_completed.xml new file mode 100644 index 0000000000000..d751a2018c9f9 --- /dev/null +++ b/sqlserver/tests/xml_xe_events/sql_batch_completed.xml @@ -0,0 +1,189 @@ + + + + + 2844000 + + + + + + 4829704 + + + + + + 0 + + + + + + 0 + + + + + + 46 + + + + + + 0 + + + + + + 0 + + + + + + 1 + + + + + + 0 + + + OK + + + + + + -- Set the session name here + -- DECLARE @session_name NVARCHAR(100) = 'datadog_query_errors'; + DECLARE @session_name NVARCHAR(100) = 'datadog_sp_statement_completed'; + + -- See size of ring buffer + SELECT + DATALENGTH(target_data) / 1024.0 AS ring_buffer_kb + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer'; + + -- Minimal polling of session events + SELECT + event_data.query('.') AS full_event_xml + FROM ( + SELECT CAST(t.target_data AS XML) AS target_xml + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer' + ) AS src + CROSS APPLY target_xml.nodes('//RingBufferTarget/event[position() <= 100]') AS XTbl(event_data); + + -- SELECT + -- event_data.value('(event/@timestamp)[1]', 'datetime2') AS event_timestamp, + -- event_data.query('.') AS full_event_xml + -- FROM ( + -- SELECT CAST(t.target_data AS XML) AS target_xml + -- FROM sys.dm_xe_session_targets AS t + -- JOIN sys.dm_xe_sessions AS s + -- ON t.event_session_address = s.address + -- WHERE s.name = @session_name + -- AND t.target_name = 'ring_buffer' + -- ) AS src + -- CROSS APPLY target_xml.nodes('//RingBufferTarget/event[@name="attention"]') AS XTbl(event_data) + -- ORDER BY event_timestamp; + + + + + + 0 + + + + + + 123 + + + + + + COMP-MX2YQD7P2P + + + + + + azdata + + + + + + datadog + + + + + + master + + + + + + -- Set the session name here + -- DECLARE @session_name NVARCHAR(100) = 'datadog_query_errors'; + DECLARE @session_name NVARCHAR(100) = 'datadog_sp_statement_completed'; + + -- See size of ring buffer + SELECT + DATALENGTH(target_data) / 1024.0 AS ring_buffer_kb + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer'; + + -- Minimal polling of session events + SELECT + event_data.query('.') AS full_event_xml + FROM ( + SELECT CAST(t.target_data AS XML) AS target_xml + FROM sys.dm_xe_session_targets AS t + JOIN sys.dm_xe_sessions AS s + ON t.event_session_address = s.address + WHERE s.name = @session_name + AND t.target_name = 'ring_buffer' + ) AS src + CROSS APPLY target_xml.nodes('//RingBufferTarget/event[position() <= 100]') AS XTbl(event_data); + + -- SELECT + -- event_data.value('(event/@timestamp)[1]', 'datetime2') AS event_timestamp, + -- event_data.query('.') AS full_event_xml + -- FROM ( + -- SELECT CAST(t.target_data AS XML) AS target_xml + -- FROM sys.dm_xe_session_targets AS t + -- JOIN sys.dm_xe_sessions AS s + -- ON t.event_session_address = s.address + -- WHERE s.name = @session_name + -- AND t.target_name = 'ring_buffer' + -- ) AS src + -- CROSS APPLY target_xml.nodes('//RingBufferTarget/event[@name="attention"]') AS XTbl(event_data) + -- ORDER BY event_timestamp; + + + + + + 30B1539E-E628-4B59-BCCD-1F57D870AD0C-5 + + +