From da69a85c3b7e2e3930eeba2358bb3f77e7de6984 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Fri, 13 Sep 2024 16:21:43 -0500 Subject: [PATCH] Correctly account for end_offset when deciding which partition key to look for in a freshness-based asset check (#24486) Summary: The current logic returns the last partition window that ends before the deadline, but I think what we actually want here is the most recent partition *at the time of* the deadline. The former does not correctly account for the end_offset shifting over the last partition window - the latter does. This PR makes that change. Resolves https://github.com/dagster-io/dagster/issues/24394. Test Plan: Adding a new test case now ## Summary & Motivation ## How I Tested These Changes ## Changelog Insert changelog entry or "NOCHANGELOG" here. - [ ] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_ --- .../freshness_checks/time_partition.py | 2 +- .../test_time_partition_freshness.py | 81 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/time_partition.py b/python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/time_partition.py index e62dd02ae5f7a..d7c9663ea6568 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/time_partition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/time_partition.py @@ -155,7 +155,7 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult] deadline.timestamp(), tz=partitions_def.timezone ) last_completed_time_window = check.not_none( - partitions_def.get_prev_partition_window(deadline_in_partitions_def_tz) + partitions_def.get_last_partition_window(current_time=deadline_in_partitions_def_tz) ) expected_partition_key = partitions_def.get_partition_key_range_for_time_window( last_completed_time_window diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py index 303c2e00a1cdb..918cacf279e08 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py @@ -323,6 +323,87 @@ def my_asset(): ) +def test_result_end_offset( + instance: DagsterInstance, +) -> None: + partitions_def = DailyPartitionsDefinition( + start_date=create_datetime(2020, 1, 1, 0, 0, 0), end_offset=1 + ) + + @asset(partitions_def=partitions_def) + def my_asset(): + pass + + start_time = create_datetime(2021, 1, 3, 1, 0, 0) # 2021-01-03 at 01:00:00 + + check = build_time_partition_freshness_checks( + assets=[my_asset], + deadline_cron="0 9 * * *", # 09:00 UTC + timezone="UTC", + )[0] + + freeze_datetime = start_time + with freeze_time(freeze_datetime): + # With no events, check fails. + assert_check_result( + my_asset, + instance, + [check], + AssetCheckSeverity.WARN, + False, + # We expected the asset to arrive between the end of the partition window (2021-01-02) and the current time (2021-01-03). + description_match="The asset has never been observed/materialized.", + metadata_match={ + "dagster/freshness_params": JsonMetadataValue( + { + "timezone": "UTC", + "deadline_cron": "0 9 * * *", + } + ), + "dagster/latest_cron_tick_timestamp": TimestampMetadataValue( + create_datetime(2021, 1, 2, 9, 0, 0).timestamp() + ), + }, + ) + + # Add an event for the end_offset=0 partition. Still fails. + add_new_event(instance, my_asset.key, "2020-01-01") + assert_check_result( + my_asset, + instance, + [check], + AssetCheckSeverity.WARN, + False, + description_match="Asset is overdue", + ) + + # Add an event for the most recent completed partition previous to the + # cron. Now the check passes. + add_new_event(instance, my_asset.key, "2021-01-02") + # Add a bit of time so that the description renders properly. + freeze_datetime = freeze_datetime + datetime.timedelta(seconds=1) + with freeze_time(freeze_datetime): + assert_check_result( + my_asset, + instance, + [check], + AssetCheckSeverity.WARN, + True, + description_match="Asset is currently fresh", + metadata_match={ + "dagster/fresh_until_timestamp": TimestampMetadataValue( + create_datetime(2021, 1, 3, 9, 0, 0).timestamp() + ), + "dagster/freshness_params": JsonMetadataValue( + {"timezone": "UTC", "deadline_cron": "0 9 * * *"} + ), + "dagster/latest_cron_tick_timestamp": TimestampMetadataValue( + create_datetime(2021, 1, 2, 9, 0, 0).timestamp() + ), + }, + ) + + def test_invalid_runtime_assets( instance: DagsterInstance, ) -> None: