diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index f5455a5a59ca2..d5f799aa09ae2 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -1,9 +1,16 @@ from datetime import datetime +from functools import cached_property from typing import TYPE_CHECKING, AbstractSet, Iterable, Mapping, NamedTuple, NewType, Optional from dagster import _check as check from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.time_window_partitions import ( + TimeWindow, + TimeWindowPartitionsDefinition, + TimeWindowPartitionsSubset, +) from dagster._utils.cached_method import cached_method if TYPE_CHECKING: @@ -198,6 +205,73 @@ def only_partition_keys(self, partition_keys: AbstractSet[str]) -> "AssetSlice": ), ) + @cached_property + def latest_time_window_slice(self) -> "AssetSlice": + """Returns the latest time window for the asset slice. + + If the asset slice is time-window-partitiond (e.g. daily, hourly), + this will return a slice for latest complete time window, + relative to the current effective time. + + If the asset slice is not time-windowed, this will return the same asset slice. + + TODO: add language for multi-dimensional partitioning when we support it + TODO: add language for dynamic partitioning when we support it + """ + if self._partitions_def is None: + return self + + if isinstance(self._partitions_def, StaticPartitionsDefinition): + return self + + if isinstance(self._partitions_def, TimeWindowPartitionsDefinition): + time_window = self._partitions_def.get_last_partition_window( + self._asset_graph_view.effective_dt + ) + + return ( + self._asset_graph_view.create_from_time_window(self.asset_key, time_window) + if time_window + else self._asset_graph_view.create_empty_slice(self.asset_key) + ) + + # Need to handle dynamic and multi-dimensional partitioning + check.failed(f"Unsupported partitions_def: {self._partitions_def}") + + @cached_property + def latest_time_window(self) -> TimeWindow: + """If the underlying asset is time-window partitioned, this will return the latest complete + time window relative to the effective date. For example if it is daily partitioned starting + at midnight every day. If the effective date is before the start of the partition definition, this will + return the empty time window (where both start and end are datetime.max). + + If the underlying asset is unpartitioned or static partitioned and it is not empty, + this will return a time window from the beginning of time to the effective date. If + it is empty it will return the empty time window. + + TODO: add language for multi-dimensional partitioning when we support it + TODO: add language for dynamic partitioning when we support it + """ + if isinstance(self._partitions_def, TimeWindowPartitionsDefinition): + tw = self._partitions_def.get_last_partition_window(self._asset_graph_view.effective_dt) + return tw if tw else TimeWindow.empty() + + if self._partitions_def is None or isinstance( + self._partitions_def, StaticPartitionsDefinition + ): + return ( + TimeWindow.empty() + if self.is_empty + else TimeWindow(datetime.min, self._asset_graph_view.effective_dt) + ) + + # Need to handle dynamic and multi-dimensional partitioning + check.failed(f"Unsupported partitions_def: {self._partitions_def}") + + @property + def is_empty(self) -> bool: + return self._compatible_subset.size == 0 + class AssetGraphView: """The Asset Graph View. It is a view of the asset graph from the perspective of a specific @@ -326,3 +400,29 @@ def compute_child_asset_slice( parent_asset_subset=asset_slice.convert_to_valid_asset_subset(), ), ) + + def create_from_time_window(self, asset_key: AssetKey, time_window: TimeWindow) -> AssetSlice: + partitions_def = self._get_partitions_def(asset_key) + check.inst( + partitions_def, + TimeWindowPartitionsDefinition, + "Must be a time-windowed partition definition", + ) + assert isinstance(partitions_def, TimeWindowPartitionsDefinition) # appease type checker + return _slice_from_subset( + self, + AssetSubset( + asset_key=asset_key, + value=TimeWindowPartitionsSubset( + partitions_def=partitions_def, + num_partitions=None, + included_time_windows=[time_window], + ), + ), + ) + + def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice: + return _slice_from_subset( + self, + AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)), + ) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 906c64ead9e7f..ce7c4bc54eb1b 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -234,6 +234,14 @@ class TimeWindow(NamedTuple): start: PublicAttr[datetime] end: PublicAttr[datetime] + @property + def is_empty(self) -> bool: + return self.start == self.end + + @staticmethod + def empty() -> "TimeWindow": + return TimeWindow(start=datetime.max, end=datetime.max) + @whitelist_for_serdes( field_serializers={"start": DatetimeFieldSerializer, "end": DatetimeFieldSerializer}, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py new file mode 100644 index 0000000000000..062ace34a3be8 --- /dev/null +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py @@ -0,0 +1,167 @@ +import pendulum +from dagster import Definitions, asset +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.instance import DagsterInstance + + +def test_latest_time_slice_no_end() -> None: + # starts at 2020-02-01 + no_end_daily = DailyPartitionsDefinition(pendulum.datetime(2020, 2, 1)) + + @asset(partitions_def=no_end_daily) + def daily() -> None: ... + + partition_key_list = [ + "2020-02-01", + "2020-02-02", + "2020-02-03", + ] + + defs = Definitions([daily]) + instance = DagsterInstance.ephemeral() + + # effective date is 2020-2-4 + + asset_graph_view_on_2_4 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 4) + ) + + assert asset_graph_view_on_2_4.get_asset_slice(daily.key).compute_partition_keys() == set( + partition_key_list + ) + + assert asset_graph_view_on_2_4.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() == {"2020-02-03"} + + assert asset_graph_view_on_2_4.get_asset_slice( + daily.key + ).latest_time_window.start == pendulum.datetime(2020, 2, 3) + assert asset_graph_view_on_2_4.get_asset_slice( + daily.key + ).latest_time_window.end == pendulum.datetime(2020, 2, 4) + + # effective date is 2020-2-5. Ensure one more date + + asset_graph_view_on_2_5 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 5) + ) + + assert asset_graph_view_on_2_5.get_asset_slice(daily.key).compute_partition_keys() == set( + partition_key_list + ["2020-02-04"] + ) + + assert asset_graph_view_on_2_5.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() == {"2020-02-04"} + + # in the past + + asset_graph_view_on_1_1 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 1) + ) + + assert asset_graph_view_on_1_1.get_asset_slice(daily.key).compute_partition_keys() == set() + + assert ( + asset_graph_view_on_1_1.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() + == set() + ) + + assert asset_graph_view_on_1_1.get_asset_slice(daily.key).latest_time_window_slice.is_empty + + # effective datetime is in the middle of 02-02, it means the latest + # complete time window is 02-01 -> 02-02, so the partition key should be 02-01 + + asset_graph_view_on_2_2_plus_1_min = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 2, minute=1) + ) + assert asset_graph_view_on_2_2_plus_1_min.get_asset_slice( + daily.key + ).compute_partition_keys() == set(["2020-02-01"]) + tw = asset_graph_view_on_2_2_plus_1_min.get_asset_slice(daily.key).latest_time_window + + assert tw.start == pendulum.datetime(2020, 2, 1) + assert tw.end == pendulum.datetime(2020, 2, 2) + + +def test_latest_time_slice_with_end() -> None: + # starts at 2020-02-01 + daily_partitions_def = DailyPartitionsDefinition( + start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 2, 1) + ) + + @asset(partitions_def=daily_partitions_def) + def daily() -> None: ... + + defs = Definitions([daily]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_before_start = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2019, 12, 31) + ) + assert ( + asset_graph_view_before_start.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() + == set() + ) + + asset_graph_view_at_start = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 1) + ) + assert ( + asset_graph_view_at_start.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() + == set() + ) + + asset_graph_view_after_start_before_end = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 3) + ) + assert asset_graph_view_after_start_before_end.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() == set(["2020-01-02"]) + + asset_graph_view_after_end = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 5) + ) + assert asset_graph_view_after_end.get_asset_slice( + daily.key + ).latest_time_window_slice.compute_partition_keys() == set(["2020-01-31"]) + + +def test_latest_time_slice_unpartitioned() -> None: + @asset + def unpartitioned() -> None: ... + + defs = Definitions([unpartitioned]) + instance = DagsterInstance.ephemeral() + + asset_graph_view = AssetGraphView.for_test(defs, instance) + assert not asset_graph_view.get_asset_slice(unpartitioned.key).latest_time_window_slice.is_empty + assert not asset_graph_view.get_asset_slice(unpartitioned.key).latest_time_window.is_empty + assert asset_graph_view.create_empty_slice(unpartitioned.key).latest_time_window_slice.is_empty + assert asset_graph_view.create_empty_slice(unpartitioned.key).latest_time_window.is_empty + + +def test_latest_time_slice_static_partitioned() -> None: + number_keys = {"1", "2", "3"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + defs = Definitions([up_numbers]) + instance = DagsterInstance.ephemeral() + + asset_graph_view = AssetGraphView.for_test(defs, instance) + up_slice = asset_graph_view.get_asset_slice(up_numbers.key) + assert up_slice.latest_time_window_slice.compute_partition_keys() == number_keys + assert not up_slice.is_empty + assert asset_graph_view.create_empty_slice(up_numbers.key).latest_time_window_slice.is_empty