Skip to content

Commit

Permalink
remove indent
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 10, 2023
1 parent 6087098 commit 683edcc
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,47 +1075,54 @@ def asset_partitions_subset_for_input(
assets_def = asset_layer.assets_def_for_node(self.node_handle)
upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)

if upstream_asset_key is not None:
upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key)
if upstream_asset_key is None:
check.failed("The input has no asset partitions")

if upstream_asset_partitions_def is not None:
partitions_def = assets_def.partitions_def if assets_def else None
partitions_subset = (
partitions_def.empty_subset().with_partition_key_range(
self.asset_partition_key_range, dynamic_partitions_store=self.instance
)
if partitions_def
else None
)
partition_mapping = infer_partition_mapping(
asset_layer.partition_mapping_for_node_input(
self.node_handle, upstream_asset_key
),
partitions_def,
upstream_asset_partitions_def,
)
mapped_partitions_result = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
partitions_subset,
upstream_asset_partitions_def,
dynamic_partitions_store=self.instance,
)
)
assert upstream_asset_key, "The input has no asset partitions"

if (
require_valid_partitions
and mapped_partitions_result.required_but_nonexistent_partition_keys
):
raise DagsterInvariantViolationError(
f"Partition key range {self.asset_partition_key_range} in"
f" {self.node_handle.name} depends on invalid partition keys"
f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in"
f" upstream asset {upstream_asset_key}"
)
upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key)

if upstream_asset_partitions_def is None:
check.failed("The input has no asset partitions")

assert upstream_asset_partitions_def, "The input has no asset partitions"

partitions_def = assets_def.partitions_def if assets_def else None
partitions_subset = (
partitions_def.empty_subset().with_partition_key_range(
self.asset_partition_key_range, dynamic_partitions_store=self.instance
)
if partitions_def
else None
)
partition_mapping = infer_partition_mapping(
asset_layer.partition_mapping_for_node_input(
self.node_handle, upstream_asset_key
),
partitions_def,
upstream_asset_partitions_def,
)
mapped_partitions_result = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
partitions_subset,
upstream_asset_partitions_def,
dynamic_partitions_store=self.instance,
)
)

if (
require_valid_partitions
and mapped_partitions_result.required_but_nonexistent_partition_keys
):
raise DagsterInvariantViolationError(
f"Partition key range {self.asset_partition_key_range} in"
f" {self.node_handle.name} depends on invalid partition keys"
f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in"
f" upstream asset {upstream_asset_key}"
)

return mapped_partitions_result.partitions_subset
return mapped_partitions_result.partitions_subset

check.failed("The input has no asset partitions")

def asset_partition_key_for_input(self, input_name: str) -> str:
start, end = self.asset_partition_key_range_for_input(input_name)
Expand Down

0 comments on commit 683edcc

Please sign in to comment.