Skip to content

Commit

Permalink
Freshness check sensor (#20566)
Browse files Browse the repository at this point in the history
Adds a build_freshness_check_sensor API.

Underlying API only will re-run checks once per interval. The interval
is defined by the passed-in parameters to the freshness checks
themselves. If only max_lag_minutes, then once per max_lag_minutes. If a
freshness_cron, then once per tick of the cron.

This PR also includes some changes to the build_freshness_checks APIs to
include parameterization metadata.
Special care is paid to timezones. We store in terms of timestamp, but
evaluate in terms of timezone.

How I tested this
Added unit tests for parameterizations, as well as the run request
object returned for the various parameterizations of checks. Tried to
pay special care to weird timezone cases.
Used direct invocation APIs for testing here, felt sufficient since what
we're really testing here is the business logic of the underlying
sensor.
  • Loading branch information
dpeng817 authored and benpankow committed Mar 21, 2024
1 parent 64cf941 commit 24c6911
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 7 deletions.
5 changes: 4 additions & 1 deletion docs/sphinx/sections/api/apidocs/asset-checks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ Dagster allows you to define and execute checks on your software-defined assets.

.. autofunction:: build_freshness_checks_for_non_partitioned_assets

.. autofunction:: build_freshness_checks_for_time_window_partitioned_assets
.. autofunction:: build_freshness_checks_for_time_window_partitioned_assets

.. autofunction:: build_sensor_for_freshness_checks

3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@
from dagster._core.definitions.freshness_checks.non_partitioned import (
build_freshness_checks_for_non_partitioned_assets as build_freshness_checks_for_non_partitioned_assets,
)
from dagster._core.definitions.freshness_checks.sensor import (
build_sensor_for_freshness_checks as build_sensor_for_freshness_checks,
)
from dagster._core.definitions.freshness_checks.time_window_partitioned import (
build_freshness_checks_for_time_window_partitioned_assets as build_freshness_checks_for_time_window_partitioned_assets,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Sequence, Union
from typing import Any, Dict, Optional, Sequence, Union

import pendulum

Expand All @@ -20,12 +20,17 @@
from .utils import (
DEFAULT_FRESHNESS_CRON_TIMEZONE,
DEFAULT_FRESHNESS_SEVERITY,
FRESHNESS_CRON_METADATA_KEY,
FRESHNESS_CRON_TIMEZONE_METADATA_KEY,
MAXIMUM_LAG_METADATA_KEY,
asset_to_keys_iterable,
ensure_no_duplicate_assets,
get_last_updated_timestamp,
retrieve_latest_record,
)

NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY = "dagster/non_partitioned_freshness_params"


@experimental
def build_freshness_checks_for_non_partitioned_assets(
Expand Down Expand Up @@ -101,13 +106,19 @@ def _build_freshness_check_for_assets(
severity: AssetCheckSeverity,
freshness_cron_timezone: str,
) -> Sequence[AssetChecksDefinition]:
params_metadata: Dict[str, Any] = {MAXIMUM_LAG_METADATA_KEY: maximum_lag_minutes}
if freshness_cron:
params_metadata[FRESHNESS_CRON_METADATA_KEY] = freshness_cron
params_metadata[FRESHNESS_CRON_TIMEZONE_METADATA_KEY] = freshness_cron_timezone
checks = []
for asset_key in asset_to_keys_iterable(asset):

@asset_check(
asset=asset,
description=f"Evaluates freshness for targeted asset. Cron: {freshness_cron}, Maximum lag minutes: {maximum_lag_minutes}.",
description=f"Evaluates freshness for targeted asset. Cron: {freshness_cron}, Maximum "
f"lag minutes: {maximum_lag_minutes}.",
name="freshness_check",
metadata={NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY: params_metadata},
)
def the_check(context: AssetExecutionContext) -> AssetCheckResult:
check.invariant(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from typing import Any, Mapping, Optional, Sequence

import pendulum
from pydantic import BaseModel

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.run_request import RunRequest
from dagster._serdes.serdes import (
FieldSerializer,
JsonSerializableValue,
PackableValue,
SerializableNonScalarKeyMapping,
UnpackContext,
WhitelistMap,
deserialize_value,
pack_value,
serialize_value,
unpack_value,
whitelist_for_serdes,
)
from dagster._utils.merger import merge_dicts
from dagster._utils.schedules import get_latest_completed_cron_tick

from ..asset_checks import AssetChecksDefinition
from ..decorators import sensor
from ..sensor_definition import DefaultSensorStatus, SensorDefinition, SensorEvaluationContext
from .non_partitioned import NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY
from .time_window_partitioned import TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY
from .utils import (
DEFAULT_FRESHNESS_CRON_TIMEZONE,
FRESHNESS_CRON_METADATA_KEY,
FRESHNESS_CRON_TIMEZONE_METADATA_KEY,
MAXIMUM_LAG_METADATA_KEY,
ensure_freshness_checks,
ensure_no_duplicate_asset_checks,
)

DEFAULT_FRESHNESS_SENSOR_NAME = "freshness_checks_sensor"


class EvaluationTimestampsSerializer(FieldSerializer):
def pack(
self,
mapping: Mapping[str, float],
whitelist_map: WhitelistMap,
descent_path: str,
) -> JsonSerializableValue:
return pack_value(SerializableNonScalarKeyMapping(mapping), whitelist_map, descent_path)

def unpack(
self,
unpacked_value: JsonSerializableValue,
whitelist_map: WhitelistMap,
context: UnpackContext,
) -> PackableValue:
return unpack_value(unpacked_value, dict, whitelist_map, context)


@whitelist_for_serdes(
field_serializers={"evaluation_timestamps_by_check_key": EvaluationTimestampsSerializer}
)
class FreshnessCheckSensorCursor(BaseModel):
"""The cursor for the freshness check sensor."""

evaluation_timestamps_by_check_key: Mapping[AssetCheckKey, float]

def get_last_evaluation_timestamp(self, key: AssetCheckKey) -> Optional[float]:
return self.evaluation_timestamps_by_check_key.get(key)

@staticmethod
def empty():
return FreshnessCheckSensorCursor(evaluation_timestamps_by_check_key={})

def with_updated_evaluations(
self, keys: Sequence[AssetCheckKey], evaluation_timestamp: float
) -> "FreshnessCheckSensorCursor":
return FreshnessCheckSensorCursor(
evaluation_timestamps_by_check_key=merge_dicts(
self.evaluation_timestamps_by_check_key,
{key: evaluation_timestamp for key in keys},
)
)


@experimental
def build_sensor_for_freshness_checks(
*,
freshness_checks: Sequence[AssetChecksDefinition],
minimum_interval_seconds: Optional[int] = None,
name: str = DEFAULT_FRESHNESS_SENSOR_NAME,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
) -> SensorDefinition:
"""Builds a sensor which kicks off evaluation of freshness checks.
The sensor will introspect from the parameters of the passed-in freshness checks how often to
run them. IE, if the freshness check is based on a cron schedule, the sensor will request one
run of the check per tick of the cron. If the freshness check is based on a maximum lag, the
sensor will request one run of the check per interval specified by the maximum lag.
Args:
freshness_checks (Sequence[AssetChecksDefinition]): The freshness checks to evaluate.
minimum_interval_seconds (Optional[int]): The duration in seconds between evaluations of the sensor.
name (Optional[str]): The name of the sensor. Defaults to "freshness_check_sensor", but a
name may need to be provided in case of multiple calls of this function.
default_status (Optional[DefaultSensorStatus]): The default status of the sensor. Defaults
to stopped.
Returns:
SensorDefinition: The sensor that evaluates the freshness of the assets.
"""
freshness_checks = check.sequence_param(freshness_checks, "freshness_checks")
ensure_no_duplicate_asset_checks(freshness_checks)
ensure_freshness_checks(freshness_checks)
check.invariant(
check.int_param(minimum_interval_seconds, "minimum_interval_seconds") > 0
if minimum_interval_seconds
else True,
"Interval must be a positive integer.",
)
check.str_param(name, "name")

@sensor(
name=name,
minimum_interval_seconds=minimum_interval_seconds,
description="Evaluates the freshness of targeted assets.",
asset_selection=AssetSelection.checks(*freshness_checks),
default_status=default_status,
)
def the_sensor(context: SensorEvaluationContext) -> Optional[RunRequest]:
cursor = (
deserialize_value(context.cursor, FreshnessCheckSensorCursor)
if context.cursor
else FreshnessCheckSensorCursor.empty()
)
current_timestamp = pendulum.now("UTC").timestamp()
check_keys_to_run = []
for asset_check in freshness_checks:
for asset_check_spec in asset_check.check_specs:
prev_evaluation_timestamp = cursor.get_last_evaluation_timestamp(
asset_check_spec.key
)
if should_run_again(asset_check_spec, prev_evaluation_timestamp, current_timestamp):
check_keys_to_run.append(asset_check_spec.key)
context.update_cursor(
serialize_value(cursor.with_updated_evaluations(check_keys_to_run, current_timestamp))
)
if check_keys_to_run:
return RunRequest(asset_check_keys=check_keys_to_run)

return the_sensor


def should_run_again(
check_spec: AssetCheckSpec, prev_evaluation_timestamp: Optional[float], current_timestamp: float
) -> bool:
metadata = check_spec.metadata
if not metadata:
# This should never happen, but type hinting prevents us from using check.not_none
check.assert_never(metadata)

if not prev_evaluation_timestamp:
return True

timezone = get_freshness_cron_timezone(metadata) or DEFAULT_FRESHNESS_CRON_TIMEZONE
current_time = pendulum.from_timestamp(current_timestamp, tz=timezone)
prev_evaluation_time = pendulum.from_timestamp(prev_evaluation_timestamp, tz=timezone)

freshness_cron = get_freshness_cron(metadata)
if freshness_cron:
latest_completed_cron_tick = check.not_none(
get_latest_completed_cron_tick(freshness_cron, current_time, timezone)
)
return latest_completed_cron_tick > prev_evaluation_time

maximum_lag_minutes = check.not_none(get_maximum_lag_minutes(metadata))
return current_time > prev_evaluation_time.add(minutes=maximum_lag_minutes)


def get_metadata(check_spec: AssetCheckSpec) -> Mapping[str, Any]:
if check_spec.metadata:
return check_spec.metadata
check.assert_never(check_spec.metadata)


def get_params_key(metadata: Mapping[str, Any]) -> str:
return (
TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY
if TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in metadata
else NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY
)


def get_freshness_cron(metadata: Mapping[str, Any]) -> Optional[str]:
return metadata[get_params_key(metadata)].get(FRESHNESS_CRON_METADATA_KEY)


def get_freshness_cron_timezone(metadata: Mapping[str, Any]) -> Optional[str]:
return metadata[get_params_key(metadata)].get(FRESHNESS_CRON_TIMEZONE_METADATA_KEY)


def get_maximum_lag_minutes(metadata: Mapping[str, Any]) -> int:
return metadata[NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY].get(MAXIMUM_LAG_METADATA_KEY)
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
from .utils import (
DEFAULT_FRESHNESS_CRON_TIMEZONE,
DEFAULT_FRESHNESS_SEVERITY,
FRESHNESS_CRON_METADATA_KEY,
FRESHNESS_CRON_TIMEZONE_METADATA_KEY,
asset_to_keys_iterable,
ensure_no_duplicate_assets,
get_last_updated_timestamp,
retrieve_latest_record,
)

TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY = (
"dagster/time_window_partitioned_freshness_params"
)


@experimental
def build_freshness_checks_for_time_window_partitioned_assets(
Expand Down Expand Up @@ -89,6 +95,12 @@ def _build_freshness_checks_for_asset(
asset=asset_key,
description="Evaluates freshness for targeted asset.",
name="freshness_check",
metadata={
TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY: {
FRESHNESS_CRON_METADATA_KEY: freshness_cron,
FRESHNESS_CRON_TIMEZONE_METADATA_KEY: freshness_cron_timezone,
}
},
)
def the_check(context: AssetExecutionContext) -> AssetCheckResult:
current_time = pendulum.now()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from typing import Iterator, Optional, Sequence, Union

from dagster import _check as check
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_time import DATA_TIME_METADATA_KEY
from dagster._core.event_api import AssetRecordsFilter, EventLogRecord
from dagster._core.events import DagsterEventType
from dagster._core.instance import DagsterInstance

from ..asset_check_spec import AssetCheckSeverity
from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..data_time import DATA_TIME_METADATA_KEY
from ..events import AssetKey, CoercibleToAssetKey

DEFAULT_FRESHNESS_SEVERITY = AssetCheckSeverity.WARN
DEFAULT_FRESHNESS_CRON_TIMEZONE = "UTC"
MAXIMUM_LAG_METADATA_KEY = "dagster/maximum_lag_minutes"
FRESHNESS_CRON_METADATA_KEY = "dagster/freshness_cron"
FRESHNESS_CRON_TIMEZONE_METADATA_KEY = "dagster/freshness_cron_timezone"


def ensure_no_duplicate_assets(
Expand Down Expand Up @@ -54,6 +58,25 @@ def asset_to_keys_iterable(
yield AssetKey.from_coercible_or_definition(asset)


def ensure_no_duplicate_asset_checks(
asset_checks: Sequence[AssetChecksDefinition],
) -> None:
asset_check_keys = [
asset_check_key
for asset_check in asset_checks
for asset_check_key in asset_check.check_keys
]
duplicate_asset_checks = [
asset_check_key
for asset_check_key in asset_check_keys
if asset_check_keys.count(asset_check_key) > 1
]
check.invariant(
len(duplicate_asset_checks) == 0,
f"Found duplicate asset checks in the provided list of asset checks: {duplicate_asset_checks}. Please ensure that each provided asset check is unique.",
)


def retrieve_latest_record(
instance: DagsterInstance,
asset_key: AssetKey,
Expand Down Expand Up @@ -114,3 +137,19 @@ def get_last_updated_timestamp(record: Optional[EventLogRecord]) -> Optional[flo
return None
else:
check.failed("Expected record to be an observation or materialization")


def ensure_freshness_checks(checks: Sequence[AssetChecksDefinition]) -> None:
from .non_partitioned import NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY
from .time_window_partitioned import TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY

for asset_check in checks:
for check_spec in asset_check.check_specs:
check.invariant(
check_spec.metadata
and (
NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in check_spec.metadata
or TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in check_spec.metadata
),
f"Asset check {check_spec.key} didn't have expected metadata. Please ensure that the asset check is a freshness check.",
)
Loading

1 comment on commit 24c6911

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-kn8jyl8jb-elementl.vercel.app
https://release-1-6-12.dagster.dagster-docs.io

Built with commit 24c6911.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.