diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index e12143ddf893c..5984aac5f6f76 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1764,6 +1764,10 @@ def get_can_run_with_parent_subsets( partition_mapping = asset_graph.get_partition_mapping( candidate_asset_key, parent_asset_key=parent_asset_key ) + + # First filter out cases where even if the parent was requested this iteration, it wouldn't + # matter, because the parent and child can't execute in the same run + # checks if there is a simple partition mapping between the parent and the child has_identity_partition_mapping = ( # both unpartitioned @@ -1863,7 +1867,10 @@ def get_can_run_with_parent_subsets( ], ) - # Split out the partitions to remove any that were not yet requested + # We now know that the parent and child are eligible to happen in the same run, so pass + # any children of parents that actually were requested in this iteration (by + # being in either the asset_graph_subset_matched_so_far subset, or more rarely in + # candidate_asset_graph_subset_unit if they are part of a non-subsettable multi-asset) failure_subsets_with_reasons = [] candidate_subset = check.not_none( @@ -1872,9 +1879,6 @@ def get_can_run_with_parent_subsets( ) ) - # Filter out any children of parents that weren't requested in this iteration (by - # being in either the asset_graph_subset_matched_so_far subset, or more rarely in - # the candidate subset if they are part of a non-subsettable multi-asset) not_yet_requested_parent_subset = parent_subset.compute_difference( parent_already_requested_subset.compute_union(candidate_subset) ) @@ -1909,7 +1913,7 @@ def _should_backfill_atomic_asset_graph_subset_unit( materialized_subset: AssetGraphSubset, failed_and_downstream_subset: AssetGraphSubset, ) -> AssetGraphViewBfsFilterConditionResult: - final_failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] + failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = [] candidate_entity_subsets = list( asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset_unit) @@ -1936,7 +1940,7 @@ def _should_backfill_atomic_asset_graph_subset_unit( if passed_entity_subset.is_empty: break - passed_entity_subset, failure_subset_values_with_reasons = ( + passed_entity_subset, new_failure_subset_values_with_reasons = ( _should_backfill_atomic_asset_subset_unit( asset_graph_view, entity_subset_to_filter=passed_entity_subset, @@ -1949,7 +1953,7 @@ def _should_backfill_atomic_asset_graph_subset_unit( ) ) passed_subset_value = passed_entity_subset.value - final_failure_subset_values_with_reasons.extend(failure_subset_values_with_reasons) + failure_subset_values_with_reasons.extend(new_failure_subset_values_with_reasons) passed_entity_subsets = [] for candidate_entity_subset in candidate_entity_subsets: @@ -1964,7 +1968,7 @@ def _should_backfill_atomic_asset_graph_subset_unit( failure_asset_graph_subsets_with_reasons = [] # Any failure partition values apply to all candidate asset keys, so construct a subset # graph with that partition subset value for each key - for failure_subset_value, reason in final_failure_subset_values_with_reasons: + for failure_subset_value, reason in failure_subset_values_with_reasons: failure_entity_subsets = [ check.not_none( asset_graph_view.get_subset_from_serializable_subset(