diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index cba83faadd225..e4368e5270d90 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -1,9 +1,24 @@ from datetime import datetime -from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional +from typing import ( + TYPE_CHECKING, + AbstractSet, + Mapping, + NamedTuple, + NewType, + Optional, + Sequence, + cast, +) from dagster import _check as check from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset from dagster._core.definitions.events import AssetKey +from dagster._core.definitions.partition import AllPartitionsSubset, StaticPartitionsDefinition +from dagster._core.definitions.time_window_partitions import ( + TimeWindow, + TimeWindowPartitionsDefinition, + TimeWindowPartitionsSubset, +) from dagster._utils.cached_method import cached_method if TYPE_CHECKING: @@ -142,6 +157,26 @@ def compute_intersection_with_partition_keys( """Return a new AssetSlice with only the given partition keys if they are in the slice.""" return self._asset_graph_view.compute_intersection_with_partition_keys(partition_keys, self) + @property + def time_windows(self) -> Sequence[TimeWindow]: + """Get the time windows for the asset slice. Only supports explicitly time-windowed partitions for now.""" + # Only supports explicitly time-windows partitions for now + tw_partitions_def = _required_tw_partitions_def(self._partitions_def) + + if isinstance(self._compatible_subset.subset_value, TimeWindowPartitionsSubset): + return self._compatible_subset.subset_value.included_time_windows + elif isinstance(self._compatible_subset.subset_value, AllPartitionsSubset): + last_tw = tw_partitions_def.get_last_partition_window( + self._asset_graph_view.effective_dt + ) + return [TimeWindow(datetime.min, last_tw.end)] if last_tw else [] + else: + check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}") + + @property + def is_empty(self) -> bool: + return self._compatible_subset.size == 0 + class AssetGraphView: """The Asset Graph View. It is a view of the asset graph from the perspective of a specific @@ -295,3 +330,62 @@ def compute_intersection_with_partition_keys( asset_slice.asset_key, partitions_def, partition_keys ), ) + + def create_from_time_window(self, asset_key: AssetKey, time_window: TimeWindow) -> AssetSlice: + return _slice_from_subset( + self, + AssetSubset( + asset_key=asset_key, + value=TimeWindowPartitionsSubset( + partitions_def=_required_tw_partitions_def(self._get_partitions_def(asset_key)), + num_partitions=None, + included_time_windows=[time_window], + ), + ), + ) + + def create_latest_time_window_slice(self, asset_key: AssetKey) -> AssetSlice: + """If the underlying asset is time-window partitioned, this will return the latest complete + time window relative to the effective date. For example if it is daily partitioned starting + at midnight every day. If the effective date is before the start of the partition definition, this will + return the empty time window (where both start and end are datetime.max). + + If the underlying asset is unpartitioned or static partitioned and it is not empty, + this will return a time window from the beginning of time to the effective date. If + it is empty it will return the empty time window. + + TODO: add language for multi-dimensional partitioning when we support it + TODO: add language for dynamic partitioning when we support it + """ + partitions_def = self._get_partitions_def(asset_key) + if partitions_def is None: + return self.get_asset_slice(asset_key) + + if isinstance(partitions_def, StaticPartitionsDefinition): + return self.get_asset_slice(asset_key) + + if isinstance(partitions_def, TimeWindowPartitionsDefinition): + time_window = partitions_def.get_last_partition_window(self.effective_dt) + return ( + self.create_from_time_window(asset_key, time_window) + if time_window + else self.create_empty_slice(asset_key) + ) + + # Need to handle dynamic and multi-dimensional partitioning + check.failed(f"Unsupported partitions_def: {partitions_def}") + + def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice: + return _slice_from_subset( + self, + AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)), + ) + + +def _required_tw_partitions_def( + partitions_def: Optional["PartitionsDefinition"], +) -> TimeWindowPartitionsDefinition: + return cast( + TimeWindowPartitionsDefinition, + check.inst(partitions_def, TimeWindowPartitionsDefinition, "Must be time windowed."), + ) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 906c64ead9e7f..ce7c4bc54eb1b 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -234,6 +234,14 @@ class TimeWindow(NamedTuple): start: PublicAttr[datetime] end: PublicAttr[datetime] + @property + def is_empty(self) -> bool: + return self.start == self.end + + @staticmethod + def empty() -> "TimeWindow": + return TimeWindow(start=datetime.max, end=datetime.max) + @whitelist_for_serdes( field_serializers={"start": DatetimeFieldSerializer, "end": DatetimeFieldSerializer}, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py new file mode 100644 index 0000000000000..74462c43dcb9a --- /dev/null +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py @@ -0,0 +1,175 @@ +from datetime import datetime + +import pendulum +from dagster import ( + Definitions, + _check as check, + asset, +) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, AssetSlice +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition, TimeWindow +from dagster._core.instance import DagsterInstance + + +def _tw(asset_slice: AssetSlice) -> TimeWindow: + tws = asset_slice.time_windows + check.invariant(len(tws) == 1) + return next(iter(tws)) + + +def test_latest_time_slice_no_end() -> None: + # starts at 2020-02-01 + no_end_daily = DailyPartitionsDefinition(pendulum.datetime(2020, 2, 1)) + + @asset(partitions_def=no_end_daily) + def daily() -> None: ... + + partition_key_list = [ + "2020-02-01", + "2020-02-02", + "2020-02-03", + ] + + defs = Definitions([daily]) + instance = DagsterInstance.ephemeral() + + # effective date is 2020-2-4 + + asset_graph_view_on_2_4 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 4) + ) + + assert asset_graph_view_on_2_4.get_asset_slice(daily.key).compute_partition_keys() == set( + partition_key_list + ) + + assert asset_graph_view_on_2_4.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() == {"2020-02-03"} + + assert _tw( + asset_graph_view_on_2_4.create_latest_time_window_slice(daily.key) + ).start == pendulum.datetime(2020, 2, 3) + + assert _tw( + asset_graph_view_on_2_4.create_latest_time_window_slice(daily.key) + ).end == pendulum.datetime(2020, 2, 4) + + # effective date is 2020-2-5. Ensure one more date + + asset_graph_view_on_2_5 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 5) + ) + + assert asset_graph_view_on_2_5.get_asset_slice(daily.key).compute_partition_keys() == set( + partition_key_list + ["2020-02-04"] + ) + + assert asset_graph_view_on_2_5.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() == {"2020-02-04"} + + # in the past + + asset_graph_view_on_1_1 = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 1) + ) + + assert asset_graph_view_on_1_1.get_asset_slice(daily.key).compute_partition_keys() == set() + + assert ( + asset_graph_view_on_1_1.create_latest_time_window_slice(daily.key).compute_partition_keys() + == set() + ) + + assert asset_graph_view_on_1_1.create_latest_time_window_slice(daily.key).is_empty + + # effective datetime is in the middle of 02-02, it means the latest + # complete time window is 02-01 -> 02-02, so the partition key should be 02-01 + + asset_graph_view_on_2_2_plus_1_min = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 2, minute=1) + ) + assert asset_graph_view_on_2_2_plus_1_min.get_asset_slice( + daily.key + ).compute_partition_keys() == set(["2020-02-01"]) + + tw = _tw(asset_graph_view_on_2_2_plus_1_min.get_asset_slice(daily.key)) + + assert tw.start == datetime.min + assert tw.end == pendulum.datetime(2020, 2, 2) + + +def test_latest_time_slice_with_end() -> None: + # starts at 2020-02-01 + daily_partitions_def = DailyPartitionsDefinition( + start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 2, 1) + ) + + @asset(partitions_def=daily_partitions_def) + def daily() -> None: ... + + defs = Definitions([daily]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_before_start = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2019, 12, 31) + ) + assert ( + asset_graph_view_before_start.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() + == set() + ) + + asset_graph_view_at_start = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 1) + ) + assert ( + asset_graph_view_at_start.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() + == set() + ) + + asset_graph_view_after_start_before_end = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 1, 3) + ) + assert asset_graph_view_after_start_before_end.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() == set(["2020-01-02"]) + + asset_graph_view_after_end = AssetGraphView.for_test( + defs, instance, effective_dt=pendulum.datetime(2020, 2, 5) + ) + assert asset_graph_view_after_end.create_latest_time_window_slice( + daily.key + ).compute_partition_keys() == set(["2020-01-31"]) + + +def test_latest_time_slice_unpartitioned() -> None: + @asset + def unpartitioned() -> None: ... + + defs = Definitions([unpartitioned]) + instance = DagsterInstance.ephemeral() + + asset_graph_view = AssetGraphView.for_test(defs, instance) + assert not asset_graph_view.get_asset_slice(unpartitioned.key).is_empty + assert not asset_graph_view.create_latest_time_window_slice(unpartitioned.key).is_empty + + +def test_latest_time_slice_static_partitioned() -> None: + number_keys = {"1", "2", "3"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + defs = Definitions([up_numbers]) + instance = DagsterInstance.ephemeral() + + asset_graph_view = AssetGraphView.for_test(defs, instance) + latest_up_slice = asset_graph_view.create_latest_time_window_slice(up_numbers.key) + assert latest_up_slice.compute_partition_keys() == number_keys