Skip to content

Commit

Permalink
feat: add freshness alert sensor and job for assets to dagster
Browse files Browse the repository at this point in the history
  • Loading branch information
IcaroG committed Feb 6, 2025
1 parent 4cb25a2 commit bd33249
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 28 deletions.
5 changes: 2 additions & 3 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .cbt import CBTResource
from .config import DagsterConfig
from .factories import load_all_assets_from_package
from .factories.alerts import setup_alert_sensor
from .factories.alerts import setup_alert_sensors
from .resources import (
BigQueryDataTransferResource,
ClickhouseResource,
Expand Down Expand Up @@ -183,8 +183,7 @@ def load_definitions():
global_config.discord_webhook_url
)

alerts = setup_alert_sensor(
"alerts",
alerts = setup_alert_sensors(
global_config.alerts_base_url,
alert_manager,
False,
Expand Down
76 changes: 69 additions & 7 deletions warehouse/oso_dagster/factories/alerts.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,53 @@
from datetime import datetime, timedelta
from typing import Mapping

from dagster import (
AssetSelection,
DefaultSensorStatus,
MultiAssetSensorEvaluationContext,
OpExecutionContext,
RunConfig,
RunFailureSensorContext,
RunRequest,
SkipReason,
job,
multi_asset_sensor,
op,
run_failure_sensor,
)

from ..utils import AlertManager, AlertOpConfig
from ..utils import AlertManager, AlertOpConfig, FreshnessOpConfig
from .common import AssetFactoryResponse


def setup_alert_sensor(name: str, base_url: str, alert_manager: AlertManager, enable: bool = True):
@op(name=f"{name}_alert_op")
def setup_alert_sensors(
base_url: str, alert_manager: AlertManager, enable: bool = True
):
@op(name="failure_alert_op")
def failure_op(context: OpExecutionContext, config: AlertOpConfig) -> None:
alert_manager.failure_op(base_url, context, config)

@job(name=f"{name}_alert_job")
@job(name="failure_alert_job")
def failure_job():
failure_op()

@op(name="freshness_alert_op")
def freshness_alert_op(
context: OpExecutionContext, config: FreshnessOpConfig
) -> None:
alert_manager.freshness_op(base_url, config)

@job(name="freshness_alert_job")
def freshness_alert_job():
freshness_alert_op()

if enable:
status = DefaultSensorStatus.RUNNING
else:
status = DefaultSensorStatus.STOPPED

@run_failure_sensor(
name=name, default_status=status, request_job=failure_job
name="failure_alert", default_status=status, request_job=failure_job
)
def failure_sensor(context: RunFailureSensorContext):
if context.failure_event.job_name not in [
Expand Down Expand Up @@ -72,7 +90,7 @@ def failure_sensor(context: RunFailureSensorContext):
run_key=context.dagster_run.run_id,
run_config=RunConfig(
ops={
f"{name}_alert_op": {
"failure_alert_op": {
"config": {
"run_id": context.dagster_run.run_id,
}
Expand All @@ -81,4 +99,48 @@ def failure_sensor(context: RunFailureSensorContext):
),
)

return AssetFactoryResponse([], sensors=[failure_sensor], jobs=[failure_job])
# Only validates assets that have materialized at least once successfully
@multi_asset_sensor(
monitored_assets=AssetSelection.all(),
job=freshness_alert_job,
default_status=status,
minimum_interval_seconds=259200, # 3 days
)
def freshness_check_sensor(context: MultiAssetSensorEvaluationContext):
materialization_records = context.latest_materialization_records_by_key(
context.asset_keys
)

stale_assets: Mapping[str, float] = {}

for asset_key, record in materialization_records.items():
if record is None:
continue

context.log.info(
f"{datetime.now().timestamp() - record.event_log_entry.timestamp} - {timedelta(minutes=2).total_seconds()}"
)
if (
datetime.now().timestamp() - record.event_log_entry.timestamp
> timedelta(minutes=2).total_seconds()
):
# Reset the cursor to always check the latest materialization
context.advance_cursor({asset_key: None})
stale_assets[asset_key.to_user_string()] = (
record.event_log_entry.timestamp
)

if len(stale_assets) == 0:
return SkipReason("No stale assets found")

return RunRequest(
run_config=RunConfig(
ops={"freshness_alert_op": {"config": {"stale_assets": stale_assets}}}
),
)

return AssetFactoryResponse(
[],
sensors=[failure_sensor, freshness_check_sensor],
jobs=[failure_job, freshness_alert_job],
)
73 changes: 55 additions & 18 deletions warehouse/oso_dagster/utils/alerts.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import io
import logging
from datetime import datetime
from typing import Optional
from typing import Mapping, Optional

import pytz
import requests
from attr import dataclass
from dagster import Config, DagsterEvent, DagsterEventType, OpExecutionContext
from dagster._core.events import JobFailureData
from discord_webhook import DiscordEmbed, DiscordWebhook
from PIL import Image, ImageDraw, ImageFont, ImageFile
from PIL import Image, ImageDraw, ImageFile, ImageFont

logger = logging.getLogger(__name__)

Expand All @@ -18,6 +18,10 @@ class AlertOpConfig(Config):
run_id: str


class FreshnessOpConfig(Config):
stale_assets: Mapping[str, float]


class AlertManager:
"""Base class for an alert manager"""

Expand All @@ -29,6 +33,9 @@ def failure_op(
) -> None:
raise NotImplementedError()

def freshness_op(self, base_url: str, config: FreshnessOpConfig) -> None:
raise NotImplementedError()


def get_asset_step_events(
context: OpExecutionContext,
Expand Down Expand Up @@ -60,6 +67,16 @@ def failure_op(
f"{len(step_failures)} failed steps in run ({base_url}/runs/{config.run_id})"
)

def freshness_op(self, base_url, config):
output_lines: list[str] = []

for asset, timestamp in config.stale_assets.items():
output_lines.append(
f"{asset} last materialized at {datetime.fromtimestamp(timestamp)}"
)

self.alert("\n".join(output_lines))


class LogAlertManager(SimpleAlertManager):
def alert(self, message: Optional[str]):
Expand Down Expand Up @@ -161,9 +178,7 @@ def format_steps(steps: int) -> str:
image_data = io.BytesIO()
image.save(image_data, format="PNG")

self._webhook.add_file(
file=image_data.getvalue(), filename="dagster_result.png"
)
return image_data.getvalue()

def failure_op(
self, base_url: str, context: OpExecutionContext, config: AlertOpConfig
Expand Down Expand Up @@ -191,7 +206,7 @@ def failure_op(
):
job_name = result.event_specific_data.first_step_failure_event.step_key

self._config = CanvasConfig(
canvas_config = CanvasConfig(
job_name=job_name,
success=result.event_type == DagsterEventType.RUN_SUCCESS,
steps_ok=len(
Expand All @@ -213,23 +228,45 @@ def failure_op(
message=result.message or "Unknown error cause",
)

self._run_id = config.run_id
self._base_url = base_url
self.alert()
description = f"Oops! Click [`here`]({base_url}/runs/{config.run_id}) to view the details of this failure."
self.alert_discord(
"Failed Materialization", description, canvas_config=canvas_config
)

def freshness_op(self, base_url, config):
output_fields: Mapping[str, str] = {}

def alert(self, message: Optional[str] = None):
if not self._config:
raise ValueError("CanvasConfig is not set")
for asset, timestamp in config.stale_assets.items():
output_fields[asset] = (
f"[`Asset`]({base_url}/assets/{asset}) last materialized at {datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')}"
)

self.build_image(self._config)
self.alert_discord(
"Asset Freshness Summary",
"The following assets are stale:",
fields=output_fields,
)

def alert_discord(
self,
title: str,
description: str,
fields: Optional[Mapping[str, str]] = None,
canvas_config: Optional[CanvasConfig] = None,
):
embed = DiscordEmbed(
title="Failed Materialization",
description=f"Oops! Click [`here`]({self._base_url}/runs/{self._run_id}) "
"to view the details of this failure.",
title=title,
description=description,
color="ffffff",
)
embed.set_image(url="attachment://dagster_result.png")

if canvas_config:
self._webhook.add_file(
file=self.build_image(canvas_config), filename="dagster_result.png"
)
embed.set_image(url="attachment://dagster_result.png")

if fields:
for name, value in fields.items():
embed.add_embed_field(name=name, value=value, inline=False)
self._webhook.add_embed(embed)
self._webhook.execute()

0 comments on commit bd33249

Please sign in to comment.