diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 8cfcfc2b6a780..e60320b43ff55 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -447,39 +447,6 @@ def asset_partition_key_range(self) -> PartitionKeyRange: ) return PartitionKeyRange(partition_key_range_start, tags[ASSET_PARTITION_RANGE_END_TAG]) - @property - def partition_time_window(self) -> TimeWindow: - partitions_def = self.partitions_def - - if partitions_def is None: - raise DagsterInvariantViolationError("Partitions definition is not defined") - - if not has_one_dimension_time_window_partitioning(partitions_def=partitions_def): - raise DagsterInvariantViolationError( - "Expected a TimeWindowPartitionsDefinition or MultiPartitionsDefinition with a" - f" single time dimension, but instead found {type(partitions_def)}" - ) - - if self.has_partition_key: - return cast( - Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def - ).time_window_for_partition_key(self.partition_key) - elif self.has_partition_key_range: - partition_key_range = self.asset_partition_key_range - partitions_def = cast( - Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], partitions_def - ) - return TimeWindow( - partitions_def.time_window_for_partition_key(partition_key_range.start).start, - partitions_def.time_window_for_partition_key(partition_key_range.end).end, - ) - - else: - check.failed( - "Has a PartitionsDefinition, so should either have a partition key or a partition" - " key range" - ) - @property def has_partition_key(self) -> bool: return PARTITION_NAME_TAG in self._plan_data.dagster_run.tags @@ -1146,6 +1113,44 @@ def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindo partitions_def.time_window_for_partition_key(partition_key_range.end).end, ) + @property + def partition_time_window(self) -> TimeWindow: + asset_layer = self.job_def.asset_layer + partitions_def = self.job_def.partitions_def + if asset_layer: + assets_def = asset_layer.assets_def_for_node(self.node_handle) + if assets_def: + partitions_def = assets_def.partitions_def + + if partitions_def is None: + raise DagsterInvariantViolationError("Partitions definition is not defined") + + if not has_one_dimension_time_window_partitioning(partitions_def=partitions_def): + raise DagsterInvariantViolationError( + "Expected a TimeWindowPartitionsDefinition or MultiPartitionsDefinition with a" + f" single time dimension, but instead found {type(partitions_def)}" + ) + + if self.has_partition_key: + return cast( + Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def + ).time_window_for_partition_key(self.partition_key) + elif self.has_partition_key_range: + partition_key_range = self.asset_partition_key_range + partitions_def = cast( + Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], partitions_def + ) + return TimeWindow( + partitions_def.time_window_for_partition_key(partition_key_range.start).start, + partitions_def.time_window_for_partition_key(partition_key_range.end).end, + ) + + else: + check.failed( + "Has a PartitionsDefinition, so should either have a partition key or a partition" + " key range" + ) + def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow: """The time window for the partitions of the asset correponding to the given input.