diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index 3b6803ce7..b19b29800 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -21,7 +21,7 @@ from .cbt import CBTResource from .config import DagsterConfig from .factories import load_all_assets_from_package -from .factories.alerts import setup_alert_sensor +from .factories.alerts import setup_alert_sensors from .resources import ( BigQueryDataTransferResource, ClickhouseResource, @@ -183,8 +183,7 @@ def load_definitions(): global_config.discord_webhook_url ) - alerts = setup_alert_sensor( - "alerts", + alerts = setup_alert_sensors( global_config.alerts_base_url, alert_manager, False, diff --git a/warehouse/oso_dagster/factories/alerts.py b/warehouse/oso_dagster/factories/alerts.py index d6c4bdd1c..6990f0cf0 100644 --- a/warehouse/oso_dagster/factories/alerts.py +++ b/warehouse/oso_dagster/factories/alerts.py @@ -1,35 +1,53 @@ +from datetime import datetime, timedelta +from typing import Mapping + from dagster import ( + AssetSelection, DefaultSensorStatus, + MultiAssetSensorEvaluationContext, OpExecutionContext, RunConfig, RunFailureSensorContext, RunRequest, SkipReason, job, + multi_asset_sensor, op, run_failure_sensor, ) -from ..utils import AlertManager, AlertOpConfig +from ..utils import AlertManager, AlertOpConfig, FreshnessOpConfig from .common import AssetFactoryResponse -def setup_alert_sensor(name: str, base_url: str, alert_manager: AlertManager, enable: bool = True): - @op(name=f"{name}_alert_op") +def setup_alert_sensors( + base_url: str, alert_manager: AlertManager, enable: bool = True +): + @op(name="failure_alert_op") def failure_op(context: OpExecutionContext, config: AlertOpConfig) -> None: alert_manager.failure_op(base_url, context, config) - @job(name=f"{name}_alert_job") + @job(name="failure_alert_job") def failure_job(): failure_op() + @op(name="freshness_alert_op") + def freshness_alert_op( + context: OpExecutionContext, config: FreshnessOpConfig + ) -> None: + alert_manager.freshness_op(base_url, config) + + @job(name="freshness_alert_job") + def freshness_alert_job(): + freshness_alert_op() + if enable: status = DefaultSensorStatus.RUNNING else: status = DefaultSensorStatus.STOPPED @run_failure_sensor( - name=name, default_status=status, request_job=failure_job + name="failure_alert", default_status=status, request_job=failure_job ) def failure_sensor(context: RunFailureSensorContext): if context.failure_event.job_name not in [ @@ -72,7 +90,7 @@ def failure_sensor(context: RunFailureSensorContext): run_key=context.dagster_run.run_id, run_config=RunConfig( ops={ - f"{name}_alert_op": { + "failure_alert_op": { "config": { "run_id": context.dagster_run.run_id, } @@ -81,4 +99,48 @@ def failure_sensor(context: RunFailureSensorContext): ), ) - return AssetFactoryResponse([], sensors=[failure_sensor], jobs=[failure_job]) + # Only validates assets that have materialized at least once successfully + @multi_asset_sensor( + monitored_assets=AssetSelection.all(), + job=freshness_alert_job, + default_status=status, + minimum_interval_seconds=259200, # 3 days + ) + def freshness_check_sensor(context: MultiAssetSensorEvaluationContext): + materialization_records = context.latest_materialization_records_by_key( + context.asset_keys + ) + + stale_assets: Mapping[str, float] = {} + + for asset_key, record in materialization_records.items(): + if record is None: + continue + + context.log.info( + f"{datetime.now().timestamp() - record.event_log_entry.timestamp} - {timedelta(minutes=2).total_seconds()}" + ) + if ( + datetime.now().timestamp() - record.event_log_entry.timestamp + > timedelta(minutes=2).total_seconds() + ): + # Reset the cursor to always check the latest materialization + context.advance_cursor({asset_key: None}) + stale_assets[asset_key.to_user_string()] = ( + record.event_log_entry.timestamp + ) + + if len(stale_assets) == 0: + return SkipReason("No stale assets found") + + return RunRequest( + run_config=RunConfig( + ops={"freshness_alert_op": {"config": {"stale_assets": stale_assets}}} + ), + ) + + return AssetFactoryResponse( + [], + sensors=[failure_sensor, freshness_check_sensor], + jobs=[failure_job, freshness_alert_job], + ) diff --git a/warehouse/oso_dagster/utils/alerts.py b/warehouse/oso_dagster/utils/alerts.py index a57b74144..5fb17da90 100644 --- a/warehouse/oso_dagster/utils/alerts.py +++ b/warehouse/oso_dagster/utils/alerts.py @@ -1,7 +1,7 @@ import io import logging from datetime import datetime -from typing import Optional +from typing import Mapping, Optional import pytz import requests @@ -9,7 +9,7 @@ from dagster import Config, DagsterEvent, DagsterEventType, OpExecutionContext from dagster._core.events import JobFailureData from discord_webhook import DiscordEmbed, DiscordWebhook -from PIL import Image, ImageDraw, ImageFont, ImageFile +from PIL import Image, ImageDraw, ImageFile, ImageFont logger = logging.getLogger(__name__) @@ -18,6 +18,10 @@ class AlertOpConfig(Config): run_id: str +class FreshnessOpConfig(Config): + stale_assets: Mapping[str, float] + + class AlertManager: """Base class for an alert manager""" @@ -29,6 +33,9 @@ def failure_op( ) -> None: raise NotImplementedError() + def freshness_op(self, base_url: str, config: FreshnessOpConfig) -> None: + raise NotImplementedError() + def get_asset_step_events( context: OpExecutionContext, @@ -60,6 +67,16 @@ def failure_op( f"{len(step_failures)} failed steps in run ({base_url}/runs/{config.run_id})" ) + def freshness_op(self, base_url, config): + output_lines: list[str] = [] + + for asset, timestamp in config.stale_assets.items(): + output_lines.append( + f"{asset} last materialized at {datetime.fromtimestamp(timestamp)}" + ) + + self.alert("\n".join(output_lines)) + class LogAlertManager(SimpleAlertManager): def alert(self, message: Optional[str]): @@ -161,9 +178,7 @@ def format_steps(steps: int) -> str: image_data = io.BytesIO() image.save(image_data, format="PNG") - self._webhook.add_file( - file=image_data.getvalue(), filename="dagster_result.png" - ) + return image_data.getvalue() def failure_op( self, base_url: str, context: OpExecutionContext, config: AlertOpConfig @@ -191,7 +206,7 @@ def failure_op( ): job_name = result.event_specific_data.first_step_failure_event.step_key - self._config = CanvasConfig( + canvas_config = CanvasConfig( job_name=job_name, success=result.event_type == DagsterEventType.RUN_SUCCESS, steps_ok=len( @@ -213,23 +228,45 @@ def failure_op( message=result.message or "Unknown error cause", ) - self._run_id = config.run_id - self._base_url = base_url - self.alert() + description = f"Oops! Click [`here`]({base_url}/runs/{config.run_id}) to view the details of this failure." + self.alert_discord( + "Failed Materialization", description, canvas_config=canvas_config + ) + + def freshness_op(self, base_url, config): + output_fields: Mapping[str, str] = {} - def alert(self, message: Optional[str] = None): - if not self._config: - raise ValueError("CanvasConfig is not set") + for asset, timestamp in config.stale_assets.items(): + output_fields[asset] = ( + f"[`Asset`]({base_url}/assets/{asset}) last materialized at {datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')}" + ) - self.build_image(self._config) + self.alert_discord( + "Asset Freshness Summary", + "The following assets are stale:", + fields=output_fields, + ) + def alert_discord( + self, + title: str, + description: str, + fields: Optional[Mapping[str, str]] = None, + canvas_config: Optional[CanvasConfig] = None, + ): embed = DiscordEmbed( - title="Failed Materialization", - description=f"Oops! Click [`here`]({self._base_url}/runs/{self._run_id}) " - "to view the details of this failure.", + title=title, + description=description, color="ffffff", ) - embed.set_image(url="attachment://dagster_result.png") - + if canvas_config: + self._webhook.add_file( + file=self.build_image(canvas_config), filename="dagster_result.png" + ) + embed.set_image(url="attachment://dagster_result.png") + + if fields: + for name, value in fields.items(): + embed.add_embed_field(name=name, value=value, inline=False) self._webhook.add_embed(embed) self._webhook.execute()