diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index b7261458f0e18..ff67a48ca701e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1075,47 +1075,54 @@ def asset_partitions_subset_for_input( assets_def = asset_layer.assets_def_for_node(self.node_handle) upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) - if upstream_asset_key is not None: - upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + if upstream_asset_key is None: + check.failed("The input has no asset partitions") - if upstream_asset_partitions_def is not None: - partitions_def = assets_def.partitions_def if assets_def else None - partitions_subset = ( - partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance - ) - if partitions_def - else None - ) - partition_mapping = infer_partition_mapping( - asset_layer.partition_mapping_for_node_input( - self.node_handle, upstream_asset_key - ), - partitions_def, - upstream_asset_partitions_def, - ) - mapped_partitions_result = ( - partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - partitions_subset, - upstream_asset_partitions_def, - dynamic_partitions_store=self.instance, - ) - ) + assert upstream_asset_key, "The input has no asset partitions" - if ( - require_valid_partitions - and mapped_partitions_result.required_but_nonexistent_partition_keys - ): - raise DagsterInvariantViolationError( - f"Partition key range {self.asset_partition_key_range} in" - f" {self.node_handle.name} depends on invalid partition keys" - f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" - f" upstream asset {upstream_asset_key}" - ) + upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key) + + if upstream_asset_partitions_def is None: + check.failed("The input has no asset partitions") + + assert upstream_asset_partitions_def, "The input has no asset partitions" + + partitions_def = assets_def.partitions_def if assets_def else None + partitions_subset = ( + partitions_def.empty_subset().with_partition_key_range( + self.asset_partition_key_range, dynamic_partitions_store=self.instance + ) + if partitions_def + else None + ) + partition_mapping = infer_partition_mapping( + asset_layer.partition_mapping_for_node_input( + self.node_handle, upstream_asset_key + ), + partitions_def, + upstream_asset_partitions_def, + ) + mapped_partitions_result = ( + partition_mapping.get_upstream_mapped_partitions_result_for_partitions( + partitions_subset, + upstream_asset_partitions_def, + dynamic_partitions_store=self.instance, + ) + ) + + if ( + require_valid_partitions + and mapped_partitions_result.required_but_nonexistent_partition_keys + ): + raise DagsterInvariantViolationError( + f"Partition key range {self.asset_partition_key_range} in" + f" {self.node_handle.name} depends on invalid partition keys" + f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in" + f" upstream asset {upstream_asset_key}" + ) - return mapped_partitions_result.partitions_subset + return mapped_partitions_result.partitions_subset - check.failed("The input has no asset partitions") def asset_partition_key_for_input(self, input_name: str) -> str: start, end = self.asset_partition_key_range_for_input(input_name)