From 24c69118fbe6d1a9737c0f627bac17ae11f81089 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Thu, 21 Mar 2024 12:59:06 -0700 Subject: [PATCH] Freshness check sensor (#20566) Adds a build_freshness_check_sensor API. Underlying API only will re-run checks once per interval. The interval is defined by the passed-in parameters to the freshness checks themselves. If only max_lag_minutes, then once per max_lag_minutes. If a freshness_cron, then once per tick of the cron. This PR also includes some changes to the build_freshness_checks APIs to include parameterization metadata. Special care is paid to timezones. We store in terms of timestamp, but evaluate in terms of timezone. How I tested this Added unit tests for parameterizations, as well as the run request object returned for the various parameterizations of checks. Tried to pay special care to weird timezone cases. Used direct invocation APIs for testing here, felt sufficient since what we're really testing here is the business logic of the underlying sensor. --- .../sections/api/apidocs/asset-checks.rst | 5 +- python_modules/dagster/dagster/__init__.py | 3 + .../freshness_checks/non_partitioned.py | 15 +- .../definitions/freshness_checks/sensor.py | 205 ++++++++++++ .../time_window_partitioned.py | 12 + .../definitions/freshness_checks/utils.py | 43 ++- .../test_non_partitioned.py | 20 +- .../freshness_checks_tests/test_sensor.py | 306 ++++++++++++++++++ .../test_time_window_partitioned.py | 9 +- 9 files changed, 611 insertions(+), 7 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_sensor.py diff --git a/docs/sphinx/sections/api/apidocs/asset-checks.rst b/docs/sphinx/sections/api/apidocs/asset-checks.rst index 33390ea672072..bff383c2d2cfc 100644 --- a/docs/sphinx/sections/api/apidocs/asset-checks.rst +++ b/docs/sphinx/sections/api/apidocs/asset-checks.rst @@ -27,4 +27,7 @@ Dagster allows you to define and execute checks on your software-defined assets. .. autofunction:: build_freshness_checks_for_non_partitioned_assets -.. autofunction:: build_freshness_checks_for_time_window_partitioned_assets \ No newline at end of file +.. autofunction:: build_freshness_checks_for_time_window_partitioned_assets + +.. autofunction:: build_sensor_for_freshness_checks + \ No newline at end of file diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index 1fd1223cbd238..4266b37824a2c 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -212,6 +212,9 @@ from dagster._core.definitions.freshness_checks.non_partitioned import ( build_freshness_checks_for_non_partitioned_assets as build_freshness_checks_for_non_partitioned_assets, ) +from dagster._core.definitions.freshness_checks.sensor import ( + build_sensor_for_freshness_checks as build_sensor_for_freshness_checks, +) from dagster._core.definitions.freshness_checks.time_window_partitioned import ( build_freshness_checks_for_time_window_partitioned_assets as build_freshness_checks_for_time_window_partitioned_assets, ) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/non_partitioned.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/non_partitioned.py index d9cfc9124917e..01de7affea22c 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/non_partitioned.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/non_partitioned.py @@ -1,4 +1,4 @@ -from typing import Optional, Sequence, Union +from typing import Any, Dict, Optional, Sequence, Union import pendulum @@ -20,12 +20,17 @@ from .utils import ( DEFAULT_FRESHNESS_CRON_TIMEZONE, DEFAULT_FRESHNESS_SEVERITY, + FRESHNESS_CRON_METADATA_KEY, + FRESHNESS_CRON_TIMEZONE_METADATA_KEY, + MAXIMUM_LAG_METADATA_KEY, asset_to_keys_iterable, ensure_no_duplicate_assets, get_last_updated_timestamp, retrieve_latest_record, ) +NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY = "dagster/non_partitioned_freshness_params" + @experimental def build_freshness_checks_for_non_partitioned_assets( @@ -101,13 +106,19 @@ def _build_freshness_check_for_assets( severity: AssetCheckSeverity, freshness_cron_timezone: str, ) -> Sequence[AssetChecksDefinition]: + params_metadata: Dict[str, Any] = {MAXIMUM_LAG_METADATA_KEY: maximum_lag_minutes} + if freshness_cron: + params_metadata[FRESHNESS_CRON_METADATA_KEY] = freshness_cron + params_metadata[FRESHNESS_CRON_TIMEZONE_METADATA_KEY] = freshness_cron_timezone checks = [] for asset_key in asset_to_keys_iterable(asset): @asset_check( asset=asset, - description=f"Evaluates freshness for targeted asset. Cron: {freshness_cron}, Maximum lag minutes: {maximum_lag_minutes}.", + description=f"Evaluates freshness for targeted asset. Cron: {freshness_cron}, Maximum " + f"lag minutes: {maximum_lag_minutes}.", name="freshness_check", + metadata={NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY: params_metadata}, ) def the_check(context: AssetExecutionContext) -> AssetCheckResult: check.invariant( diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py new file mode 100644 index 0000000000000..706d5d2c27d5c --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py @@ -0,0 +1,205 @@ +from typing import Any, Mapping, Optional, Sequence + +import pendulum +from pydantic import BaseModel + +from dagster import _check as check +from dagster._annotations import experimental +from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec +from dagster._core.definitions.asset_selection import AssetSelection +from dagster._core.definitions.run_request import RunRequest +from dagster._serdes.serdes import ( + FieldSerializer, + JsonSerializableValue, + PackableValue, + SerializableNonScalarKeyMapping, + UnpackContext, + WhitelistMap, + deserialize_value, + pack_value, + serialize_value, + unpack_value, + whitelist_for_serdes, +) +from dagster._utils.merger import merge_dicts +from dagster._utils.schedules import get_latest_completed_cron_tick + +from ..asset_checks import AssetChecksDefinition +from ..decorators import sensor +from ..sensor_definition import DefaultSensorStatus, SensorDefinition, SensorEvaluationContext +from .non_partitioned import NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY +from .time_window_partitioned import TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY +from .utils import ( + DEFAULT_FRESHNESS_CRON_TIMEZONE, + FRESHNESS_CRON_METADATA_KEY, + FRESHNESS_CRON_TIMEZONE_METADATA_KEY, + MAXIMUM_LAG_METADATA_KEY, + ensure_freshness_checks, + ensure_no_duplicate_asset_checks, +) + +DEFAULT_FRESHNESS_SENSOR_NAME = "freshness_checks_sensor" + + +class EvaluationTimestampsSerializer(FieldSerializer): + def pack( + self, + mapping: Mapping[str, float], + whitelist_map: WhitelistMap, + descent_path: str, + ) -> JsonSerializableValue: + return pack_value(SerializableNonScalarKeyMapping(mapping), whitelist_map, descent_path) + + def unpack( + self, + unpacked_value: JsonSerializableValue, + whitelist_map: WhitelistMap, + context: UnpackContext, + ) -> PackableValue: + return unpack_value(unpacked_value, dict, whitelist_map, context) + + +@whitelist_for_serdes( + field_serializers={"evaluation_timestamps_by_check_key": EvaluationTimestampsSerializer} +) +class FreshnessCheckSensorCursor(BaseModel): + """The cursor for the freshness check sensor.""" + + evaluation_timestamps_by_check_key: Mapping[AssetCheckKey, float] + + def get_last_evaluation_timestamp(self, key: AssetCheckKey) -> Optional[float]: + return self.evaluation_timestamps_by_check_key.get(key) + + @staticmethod + def empty(): + return FreshnessCheckSensorCursor(evaluation_timestamps_by_check_key={}) + + def with_updated_evaluations( + self, keys: Sequence[AssetCheckKey], evaluation_timestamp: float + ) -> "FreshnessCheckSensorCursor": + return FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key=merge_dicts( + self.evaluation_timestamps_by_check_key, + {key: evaluation_timestamp for key in keys}, + ) + ) + + +@experimental +def build_sensor_for_freshness_checks( + *, + freshness_checks: Sequence[AssetChecksDefinition], + minimum_interval_seconds: Optional[int] = None, + name: str = DEFAULT_FRESHNESS_SENSOR_NAME, + default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, +) -> SensorDefinition: + """Builds a sensor which kicks off evaluation of freshness checks. + + The sensor will introspect from the parameters of the passed-in freshness checks how often to + run them. IE, if the freshness check is based on a cron schedule, the sensor will request one + run of the check per tick of the cron. If the freshness check is based on a maximum lag, the + sensor will request one run of the check per interval specified by the maximum lag. + + Args: + freshness_checks (Sequence[AssetChecksDefinition]): The freshness checks to evaluate. + minimum_interval_seconds (Optional[int]): The duration in seconds between evaluations of the sensor. + name (Optional[str]): The name of the sensor. Defaults to "freshness_check_sensor", but a + name may need to be provided in case of multiple calls of this function. + default_status (Optional[DefaultSensorStatus]): The default status of the sensor. Defaults + to stopped. + + Returns: + SensorDefinition: The sensor that evaluates the freshness of the assets. + """ + freshness_checks = check.sequence_param(freshness_checks, "freshness_checks") + ensure_no_duplicate_asset_checks(freshness_checks) + ensure_freshness_checks(freshness_checks) + check.invariant( + check.int_param(minimum_interval_seconds, "minimum_interval_seconds") > 0 + if minimum_interval_seconds + else True, + "Interval must be a positive integer.", + ) + check.str_param(name, "name") + + @sensor( + name=name, + minimum_interval_seconds=minimum_interval_seconds, + description="Evaluates the freshness of targeted assets.", + asset_selection=AssetSelection.checks(*freshness_checks), + default_status=default_status, + ) + def the_sensor(context: SensorEvaluationContext) -> Optional[RunRequest]: + cursor = ( + deserialize_value(context.cursor, FreshnessCheckSensorCursor) + if context.cursor + else FreshnessCheckSensorCursor.empty() + ) + current_timestamp = pendulum.now("UTC").timestamp() + check_keys_to_run = [] + for asset_check in freshness_checks: + for asset_check_spec in asset_check.check_specs: + prev_evaluation_timestamp = cursor.get_last_evaluation_timestamp( + asset_check_spec.key + ) + if should_run_again(asset_check_spec, prev_evaluation_timestamp, current_timestamp): + check_keys_to_run.append(asset_check_spec.key) + context.update_cursor( + serialize_value(cursor.with_updated_evaluations(check_keys_to_run, current_timestamp)) + ) + if check_keys_to_run: + return RunRequest(asset_check_keys=check_keys_to_run) + + return the_sensor + + +def should_run_again( + check_spec: AssetCheckSpec, prev_evaluation_timestamp: Optional[float], current_timestamp: float +) -> bool: + metadata = check_spec.metadata + if not metadata: + # This should never happen, but type hinting prevents us from using check.not_none + check.assert_never(metadata) + + if not prev_evaluation_timestamp: + return True + + timezone = get_freshness_cron_timezone(metadata) or DEFAULT_FRESHNESS_CRON_TIMEZONE + current_time = pendulum.from_timestamp(current_timestamp, tz=timezone) + prev_evaluation_time = pendulum.from_timestamp(prev_evaluation_timestamp, tz=timezone) + + freshness_cron = get_freshness_cron(metadata) + if freshness_cron: + latest_completed_cron_tick = check.not_none( + get_latest_completed_cron_tick(freshness_cron, current_time, timezone) + ) + return latest_completed_cron_tick > prev_evaluation_time + + maximum_lag_minutes = check.not_none(get_maximum_lag_minutes(metadata)) + return current_time > prev_evaluation_time.add(minutes=maximum_lag_minutes) + + +def get_metadata(check_spec: AssetCheckSpec) -> Mapping[str, Any]: + if check_spec.metadata: + return check_spec.metadata + check.assert_never(check_spec.metadata) + + +def get_params_key(metadata: Mapping[str, Any]) -> str: + return ( + TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY + if TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in metadata + else NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY + ) + + +def get_freshness_cron(metadata: Mapping[str, Any]) -> Optional[str]: + return metadata[get_params_key(metadata)].get(FRESHNESS_CRON_METADATA_KEY) + + +def get_freshness_cron_timezone(metadata: Mapping[str, Any]) -> Optional[str]: + return metadata[get_params_key(metadata)].get(FRESHNESS_CRON_TIMEZONE_METADATA_KEY) + + +def get_maximum_lag_minutes(metadata: Mapping[str, Any]) -> int: + return metadata[NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY].get(MAXIMUM_LAG_METADATA_KEY) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/time_window_partitioned.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/time_window_partitioned.py index a21d9ce5f17de..48b79970f4b34 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/time_window_partitioned.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/time_window_partitioned.py @@ -18,12 +18,18 @@ from .utils import ( DEFAULT_FRESHNESS_CRON_TIMEZONE, DEFAULT_FRESHNESS_SEVERITY, + FRESHNESS_CRON_METADATA_KEY, + FRESHNESS_CRON_TIMEZONE_METADATA_KEY, asset_to_keys_iterable, ensure_no_duplicate_assets, get_last_updated_timestamp, retrieve_latest_record, ) +TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY = ( + "dagster/time_window_partitioned_freshness_params" +) + @experimental def build_freshness_checks_for_time_window_partitioned_assets( @@ -89,6 +95,12 @@ def _build_freshness_checks_for_asset( asset=asset_key, description="Evaluates freshness for targeted asset.", name="freshness_check", + metadata={ + TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY: { + FRESHNESS_CRON_METADATA_KEY: freshness_cron, + FRESHNESS_CRON_TIMEZONE_METADATA_KEY: freshness_cron_timezone, + } + }, ) def the_check(context: AssetExecutionContext) -> AssetCheckResult: current_time = pendulum.now() diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py index 9a176054f62d4..811bfe3d46abc 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py @@ -1,17 +1,21 @@ from typing import Iterator, Optional, Sequence, Union from dagster import _check as check -from dagster._core.definitions.asset_check_spec import AssetCheckSeverity -from dagster._core.definitions.data_time import DATA_TIME_METADATA_KEY from dagster._core.event_api import AssetRecordsFilter, EventLogRecord from dagster._core.events import DagsterEventType from dagster._core.instance import DagsterInstance +from ..asset_check_spec import AssetCheckSeverity +from ..asset_checks import AssetChecksDefinition from ..assets import AssetsDefinition, SourceAsset +from ..data_time import DATA_TIME_METADATA_KEY from ..events import AssetKey, CoercibleToAssetKey DEFAULT_FRESHNESS_SEVERITY = AssetCheckSeverity.WARN DEFAULT_FRESHNESS_CRON_TIMEZONE = "UTC" +MAXIMUM_LAG_METADATA_KEY = "dagster/maximum_lag_minutes" +FRESHNESS_CRON_METADATA_KEY = "dagster/freshness_cron" +FRESHNESS_CRON_TIMEZONE_METADATA_KEY = "dagster/freshness_cron_timezone" def ensure_no_duplicate_assets( @@ -54,6 +58,25 @@ def asset_to_keys_iterable( yield AssetKey.from_coercible_or_definition(asset) +def ensure_no_duplicate_asset_checks( + asset_checks: Sequence[AssetChecksDefinition], +) -> None: + asset_check_keys = [ + asset_check_key + for asset_check in asset_checks + for asset_check_key in asset_check.check_keys + ] + duplicate_asset_checks = [ + asset_check_key + for asset_check_key in asset_check_keys + if asset_check_keys.count(asset_check_key) > 1 + ] + check.invariant( + len(duplicate_asset_checks) == 0, + f"Found duplicate asset checks in the provided list of asset checks: {duplicate_asset_checks}. Please ensure that each provided asset check is unique.", + ) + + def retrieve_latest_record( instance: DagsterInstance, asset_key: AssetKey, @@ -114,3 +137,19 @@ def get_last_updated_timestamp(record: Optional[EventLogRecord]) -> Optional[flo return None else: check.failed("Expected record to be an observation or materialization") + + +def ensure_freshness_checks(checks: Sequence[AssetChecksDefinition]) -> None: + from .non_partitioned import NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY + from .time_window_partitioned import TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY + + for asset_check in checks: + for check_spec in asset_check.check_specs: + check.invariant( + check_spec.metadata + and ( + NON_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in check_spec.metadata + or TIME_WINDOW_PARTITIONED_FRESHNESS_PARAMS_METADATA_KEY in check_spec.metadata + ), + f"Asset check {check_spec.key} didn't have expected metadata. Please ensure that the asset check is a freshness check.", + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_non_partitioned.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_non_partitioned.py index 0a4749f74c7eb..d33b9dec7e271 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_non_partitioned.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_non_partitioned.py @@ -27,7 +27,25 @@ def my_asset(): assets=[my_asset], maximum_lag_minutes=10 ) assert len(result) == 1 - assert next(iter(result[0].check_keys)).asset_key == my_asset.key + check = result[0] + assert next(iter(check.check_keys)).asset_key == my_asset.key + assert next(iter(check.check_specs)).metadata == { + "dagster/non_partitioned_freshness_params": { + "dagster/maximum_lag_minutes": 10, + } + } + + result = build_freshness_checks_for_non_partitioned_assets( + assets=[my_asset], freshness_cron="0 0 * * *", maximum_lag_minutes=10 + ) + check = result[0] + assert next(iter(check.check_specs)).metadata == { + "dagster/non_partitioned_freshness_params": { + "dagster/maximum_lag_minutes": 10, + "dagster/freshness_cron": "0 0 * * *", + "dagster/freshness_cron_timezone": "UTC", + } + } result = build_freshness_checks_for_non_partitioned_assets( assets=[my_asset.key], maximum_lag_minutes=10 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_sensor.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_sensor.py new file mode 100644 index 0000000000000..fa3d82c760326 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_sensor.py @@ -0,0 +1,306 @@ +# pyright: reportPrivateImportUsage=false + +import pendulum +import pytest +from dagster import ( + AssetCheckKey, + AssetKey, + asset, +) +from dagster._check import CheckError +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.freshness_checks.non_partitioned import ( + build_freshness_checks_for_non_partitioned_assets, +) +from dagster._core.definitions.freshness_checks.sensor import ( + FreshnessCheckSensorCursor, + build_sensor_for_freshness_checks, +) +from dagster._core.definitions.run_request import RunRequest +from dagster._core.definitions.sensor_definition import build_sensor_context +from dagster._serdes.serdes import deserialize_value +from dagster._seven.compat.pendulum import pendulum_freeze_time + + +def test_params() -> None: + """Test the resulting sensor / error from different parameterizations of the builder function.""" + + @asset + def my_asset(): + pass + + checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_asset], maximum_lag_minutes=10 + ) + + # Only essential params + result = build_sensor_for_freshness_checks( + freshness_checks=checks, + ) + assert result.name == "freshness_checks_sensor" + + # All params (valid) + result = build_sensor_for_freshness_checks( + freshness_checks=checks, minimum_interval_seconds=10, name="my_sensor" + ) + + # Duplicate checks + with pytest.raises(CheckError, match="duplicate asset checks"): + build_sensor_for_freshness_checks( + freshness_checks=[*checks, *checks], + ) + + # Invalid interval + with pytest.raises(CheckError, match="Interval must be a positive integer"): + build_sensor_for_freshness_checks( + freshness_checks=checks, + minimum_interval_seconds=-1, + ) + + +def test_maximum_lag_minutes() -> None: + """Test the case where we have an asset partitioned with a maximum lag. Ensure that the sensor + provides expected run requests in all cases. + """ + + @asset + def my_asset(): + pass + + @asset + def my_other_asset(): + pass + + ten_minute_checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_asset], maximum_lag_minutes=10 + ) + twenty_minute_checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_other_asset], maximum_lag_minutes=20 + ) + + sensor = build_sensor_for_freshness_checks( + freshness_checks=[*ten_minute_checks, *twenty_minute_checks], + ) + + defs = Definitions( + sensors=[sensor], + assets=[my_asset, my_other_asset], + asset_checks=[*ten_minute_checks, *twenty_minute_checks], + ) + + context = build_sensor_context( + definitions=defs, + ) + + # First evaluation, we should get run requests for both checks + freeze_datetime = pendulum.now("UTC") + with pendulum_freeze_time(freeze_datetime): + run_request = sensor(context) + assert isinstance(run_request, RunRequest) + assert run_request.asset_check_keys == [ + AssetCheckKey(AssetKey("my_asset"), "freshness_check"), + AssetCheckKey(AssetKey("my_other_asset"), "freshness_check"), + ] + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey(AssetKey("my_asset"), "freshness_check"): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_other_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + } + ) + + # Run the sensor again, and ensure that we do not get a run request + result = sensor(context) + assert result is None + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey(AssetKey("my_asset"), "freshness_check"): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_other_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + } + ) + + # Advance time within the maximum_lag_minutes window, and ensure we don't get a run_request. + freeze_datetime = freeze_datetime.add(minutes=5) + with pendulum_freeze_time(freeze_datetime): + result = sensor(context) + assert result is None + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey(AssetKey("my_asset"), "freshness_check"): freeze_datetime.subtract( + minutes=5 + ).timestamp(), + AssetCheckKey( + AssetKey("my_other_asset"), "freshness_check" + ): freeze_datetime.subtract(minutes=5).timestamp(), + } + ) + + # Advance time past the maximum_lag_minutes window for the first asset, and ensure we get a run_request. + freeze_datetime = freeze_datetime.add(minutes=6) + with pendulum_freeze_time(freeze_datetime): + result = sensor(context) + assert isinstance(result, RunRequest) + assert result.asset_check_keys == [AssetCheckKey(AssetKey("my_asset"), "freshness_check")] + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey(AssetKey("my_asset"), "freshness_check"): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_other_asset"), "freshness_check" + ): freeze_datetime.subtract(minutes=11).timestamp(), + } + ) + + +def test_freshness_cron() -> None: + """Test the case where we have a freshness cron and we have not executed, not passed the + threshold time-wise, and then we have passed the threshold. + """ + + @asset + def my_behind_asset(): + pass + + @asset + def my_utc_asset(): + pass + + @asset + def my_ahead_asset(): + pass + + utc_checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_utc_asset], freshness_cron="0 0 * * *", maximum_lag_minutes=10 + ) + behind_checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_behind_asset], + freshness_cron="0 0 * * *", + maximum_lag_minutes=10, + freshness_cron_timezone="Etc/GMT-5", + ) + ahead_checks = build_freshness_checks_for_non_partitioned_assets( + assets=[my_ahead_asset], + freshness_cron="0 0 * * *", + maximum_lag_minutes=10, + freshness_cron_timezone="Etc/GMT+5", + ) + + sensor = build_sensor_for_freshness_checks( + freshness_checks=[*utc_checks, *behind_checks, *ahead_checks], + ) + + defs = Definitions( + sensors=[sensor], + assets=[my_utc_asset, my_behind_asset, my_ahead_asset], + asset_checks=[*utc_checks, *behind_checks, *ahead_checks], + ) + + context = build_sensor_context( + definitions=defs, + ) + + # First evaluation, we should get a run request + freeze_datetime = pendulum.datetime(2022, 1, 1, 0, 0, 0, tz="UTC") + with pendulum_freeze_time(freeze_datetime): + run_request = sensor(context) + assert isinstance(run_request, RunRequest) + assert run_request.asset_check_keys == [ + AssetCheckKey(AssetKey("my_utc_asset"), "freshness_check"), + AssetCheckKey(AssetKey("my_behind_asset"), "freshness_check"), + AssetCheckKey(AssetKey("my_ahead_asset"), "freshness_check"), + ] + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey( + AssetKey("my_utc_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_behind_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_ahead_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + } + ) + + # Run the sensor again, and ensure that we do not get a run request + result = sensor(context) + assert result is None + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey( + AssetKey("my_utc_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_behind_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_ahead_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + } + ) + + # Advance time past the freshness cron in GMT+5 and GMT-5. In both cases, we should have moved to a new tick of the cron. + # Advance time 20 hours, since the cron is currently at 0:00 UTC (5:00 GMT+5). + freeze_datetime = freeze_datetime.add(hours=20) + with pendulum_freeze_time(freeze_datetime): + result = sensor(context) + assert isinstance(result, RunRequest) + assert result.asset_check_keys == [ + AssetCheckKey(AssetKey("my_behind_asset"), "freshness_check"), + AssetCheckKey(AssetKey("my_ahead_asset"), "freshness_check"), + ] + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey( + AssetKey("my_utc_asset"), "freshness_check" + ): freeze_datetime.subtract(hours=20).timestamp(), + AssetCheckKey( + AssetKey("my_behind_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_ahead_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + } + ) + + # Advance time past the freshness cron in UTC. Ensure we get a new run request only for the UTC + # asset. + freeze_datetime = freeze_datetime.add(hours=5) + with pendulum_freeze_time(freeze_datetime): + result = sensor(context) + assert isinstance(result, RunRequest) + assert result.asset_check_keys == [ + AssetCheckKey(AssetKey("my_utc_asset"), "freshness_check") + ] + assert context.cursor + cursor = deserialize_value(context.cursor, FreshnessCheckSensorCursor) + assert cursor == FreshnessCheckSensorCursor( + evaluation_timestamps_by_check_key={ + AssetCheckKey( + AssetKey("my_utc_asset"), "freshness_check" + ): freeze_datetime.timestamp(), + AssetCheckKey( + AssetKey("my_behind_asset"), "freshness_check" + ): freeze_datetime.subtract(hours=5).timestamp(), + AssetCheckKey( + AssetKey("my_ahead_asset"), "freshness_check" + ): freeze_datetime.subtract(hours=5).timestamp(), + } + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_window_partitioned.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_window_partitioned.py index b0c7f70b32543..7d0e48b8c7106 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_window_partitioned.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_window_partitioned.py @@ -39,7 +39,14 @@ def my_partitioned_asset(): assets=[my_partitioned_asset], freshness_cron="0 0 * * *" ) assert len(result) == 1 - assert next(iter(result[0].check_keys)).asset_key == my_partitioned_asset.key + check = result[0] + assert next(iter(check.check_keys)).asset_key == my_partitioned_asset.key + assert next(iter(check.check_specs)).metadata == { + "dagster/time_window_partitioned_freshness_params": { + "dagster/freshness_cron": "0 0 * * *", + "dagster/freshness_cron_timezone": "UTC", + } + } result = build_freshness_checks_for_time_window_partitioned_assets( assets=[my_partitioned_asset.key], freshness_cron="0 0 * * *", freshness_cron_timezone="UTC"