Skip to content

Commit

Permalink
Correctly account for end_offset when deciding which partition key to…
Browse files Browse the repository at this point in the history
… look for in a freshness-based asset check (#24486)

Summary:
The current logic returns the last partition window that ends before the
deadline, but I think what we actually want here is the most recent
partition *at the time of* the deadline. The former does not correctly
account for the end_offset shifting over the last partition window - the
latter does. This PR makes that change.

Resolves #24394.

Test Plan: Adding a new test case now

## Summary & Motivation

## How I Tested These Changes

## Changelog

Insert changelog entry or "NOCHANGELOG" here.

- [ ] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
gibsondan authored Sep 13, 2024
1 parent 4cb5897 commit da69a85
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]
deadline.timestamp(), tz=partitions_def.timezone
)
last_completed_time_window = check.not_none(
partitions_def.get_prev_partition_window(deadline_in_partitions_def_tz)
partitions_def.get_last_partition_window(current_time=deadline_in_partitions_def_tz)
)
expected_partition_key = partitions_def.get_partition_key_range_for_time_window(
last_completed_time_window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,87 @@ def my_asset():
)


def test_result_end_offset(
instance: DagsterInstance,
) -> None:
partitions_def = DailyPartitionsDefinition(
start_date=create_datetime(2020, 1, 1, 0, 0, 0), end_offset=1
)

@asset(partitions_def=partitions_def)
def my_asset():
pass

start_time = create_datetime(2021, 1, 3, 1, 0, 0) # 2021-01-03 at 01:00:00

check = build_time_partition_freshness_checks(
assets=[my_asset],
deadline_cron="0 9 * * *", # 09:00 UTC
timezone="UTC",
)[0]

freeze_datetime = start_time
with freeze_time(freeze_datetime):
# With no events, check fails.
assert_check_result(
my_asset,
instance,
[check],
AssetCheckSeverity.WARN,
False,
# We expected the asset to arrive between the end of the partition window (2021-01-02) and the current time (2021-01-03).
description_match="The asset has never been observed/materialized.",
metadata_match={
"dagster/freshness_params": JsonMetadataValue(
{
"timezone": "UTC",
"deadline_cron": "0 9 * * *",
}
),
"dagster/latest_cron_tick_timestamp": TimestampMetadataValue(
create_datetime(2021, 1, 2, 9, 0, 0).timestamp()
),
},
)

# Add an event for the end_offset=0 partition. Still fails.
add_new_event(instance, my_asset.key, "2020-01-01")
assert_check_result(
my_asset,
instance,
[check],
AssetCheckSeverity.WARN,
False,
description_match="Asset is overdue",
)

# Add an event for the most recent completed partition previous to the
# cron. Now the check passes.
add_new_event(instance, my_asset.key, "2021-01-02")
# Add a bit of time so that the description renders properly.
freeze_datetime = freeze_datetime + datetime.timedelta(seconds=1)
with freeze_time(freeze_datetime):
assert_check_result(
my_asset,
instance,
[check],
AssetCheckSeverity.WARN,
True,
description_match="Asset is currently fresh",
metadata_match={
"dagster/fresh_until_timestamp": TimestampMetadataValue(
create_datetime(2021, 1, 3, 9, 0, 0).timestamp()
),
"dagster/freshness_params": JsonMetadataValue(
{"timezone": "UTC", "deadline_cron": "0 9 * * *"}
),
"dagster/latest_cron_tick_timestamp": TimestampMetadataValue(
create_datetime(2021, 1, 2, 9, 0, 0).timestamp()
),
},
)


def test_invalid_runtime_assets(
instance: DagsterInstance,
) -> None:
Expand Down

0 comments on commit da69a85

Please sign in to comment.