Skip to content

Commit

Permalink
[single-implicit-asset-job] in context.partition_time_window, use `…
Browse files Browse the repository at this point in the history
…partitions_def` from asset (#23289)

## Summary & Motivation

In the case where a job doesn't have a `partitions_def`, but the asset
inside it being executed does have a `partitions_def`, we want to use
the `partitions_def` from the asset.

This requires moving `partition_time_window` from the plan context,
which doesn't have access to the asset being executed, to the step
context, which does.

## How I Tested These Changes
  • Loading branch information
sryza authored Aug 1, 2024
1 parent d44c3c2 commit bcd0b86
Showing 1 changed file with 38 additions and 33 deletions.
71 changes: 38 additions & 33 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,39 +447,6 @@ def asset_partition_key_range(self) -> PartitionKeyRange:
)
return PartitionKeyRange(partition_key_range_start, tags[ASSET_PARTITION_RANGE_END_TAG])

@property
def partition_time_window(self) -> TimeWindow:
partitions_def = self.partitions_def

if partitions_def is None:
raise DagsterInvariantViolationError("Partitions definition is not defined")

if not has_one_dimension_time_window_partitioning(partitions_def=partitions_def):
raise DagsterInvariantViolationError(
"Expected a TimeWindowPartitionsDefinition or MultiPartitionsDefinition with a"
f" single time dimension, but instead found {type(partitions_def)}"
)

if self.has_partition_key:
return cast(
Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def
).time_window_for_partition_key(self.partition_key)
elif self.has_partition_key_range:
partition_key_range = self.asset_partition_key_range
partitions_def = cast(
Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], partitions_def
)
return TimeWindow(
partitions_def.time_window_for_partition_key(partition_key_range.start).start,
partitions_def.time_window_for_partition_key(partition_key_range.end).end,
)

else:
check.failed(
"Has a PartitionsDefinition, so should either have a partition key or a partition"
" key range"
)

@property
def has_partition_key(self) -> bool:
return PARTITION_NAME_TAG in self._plan_data.dagster_run.tags
Expand Down Expand Up @@ -1146,6 +1113,44 @@ def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindo
partitions_def.time_window_for_partition_key(partition_key_range.end).end,
)

@property
def partition_time_window(self) -> TimeWindow:
asset_layer = self.job_def.asset_layer
partitions_def = self.job_def.partitions_def
if asset_layer:
assets_def = asset_layer.assets_def_for_node(self.node_handle)
if assets_def:
partitions_def = assets_def.partitions_def

if partitions_def is None:
raise DagsterInvariantViolationError("Partitions definition is not defined")

if not has_one_dimension_time_window_partitioning(partitions_def=partitions_def):
raise DagsterInvariantViolationError(
"Expected a TimeWindowPartitionsDefinition or MultiPartitionsDefinition with a"
f" single time dimension, but instead found {type(partitions_def)}"
)

if self.has_partition_key:
return cast(
Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def
).time_window_for_partition_key(self.partition_key)
elif self.has_partition_key_range:
partition_key_range = self.asset_partition_key_range
partitions_def = cast(
Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition], partitions_def
)
return TimeWindow(
partitions_def.time_window_for_partition_key(partition_key_range.start).start,
partitions_def.time_window_for_partition_key(partition_key_range.end).end,
)

else:
check.failed(
"Has a PartitionsDefinition, so should either have a partition key or a partition"
" key range"
)

def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow:
"""The time window for the partitions of the asset correponding to the given input.
Expand Down

0 comments on commit bcd0b86

Please sign in to comment.