Skip to content

Commit

Permalink
final names?
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Nov 25, 2024
1 parent 52e0373 commit d598f82
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,10 @@ def get_can_run_with_parent_subsets(
partition_mapping = asset_graph.get_partition_mapping(
candidate_asset_key, parent_asset_key=parent_asset_key
)

# First filter out cases where even if the parent was requested this iteration, it wouldn't
# matter, because the parent and child can't execute in the same run

# checks if there is a simple partition mapping between the parent and the child
has_identity_partition_mapping = (
# both unpartitioned
Expand Down Expand Up @@ -1863,7 +1867,10 @@ def get_can_run_with_parent_subsets(
],
)

# Split out the partitions to remove any that were not yet requested
# We now know that the parent and child are eligible to happen in the same run, so pass
# any children of parents that actually were requested in this iteration (by
# being in either the asset_graph_subset_matched_so_far subset, or more rarely in
# candidate_asset_graph_subset_unit if they are part of a non-subsettable multi-asset)
failure_subsets_with_reasons = []

candidate_subset = check.not_none(
Expand All @@ -1872,9 +1879,6 @@ def get_can_run_with_parent_subsets(
)
)

# Filter out any children of parents that weren't requested in this iteration (by
# being in either the asset_graph_subset_matched_so_far subset, or more rarely in
# the candidate subset if they are part of a non-subsettable multi-asset)
not_yet_requested_parent_subset = parent_subset.compute_difference(
parent_already_requested_subset.compute_union(candidate_subset)
)
Expand Down Expand Up @@ -1909,7 +1913,7 @@ def _should_backfill_atomic_asset_graph_subset_unit(
materialized_subset: AssetGraphSubset,
failed_and_downstream_subset: AssetGraphSubset,
) -> AssetGraphViewBfsFilterConditionResult:
final_failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = []
failure_subset_values_with_reasons: List[Tuple[EntitySubsetValue, str]] = []

candidate_entity_subsets = list(
asset_graph_view.iterate_asset_subsets(candidate_asset_graph_subset_unit)
Expand All @@ -1936,7 +1940,7 @@ def _should_backfill_atomic_asset_graph_subset_unit(
if passed_entity_subset.is_empty:
break

passed_entity_subset, failure_subset_values_with_reasons = (
passed_entity_subset, new_failure_subset_values_with_reasons = (
_should_backfill_atomic_asset_subset_unit(
asset_graph_view,
entity_subset_to_filter=passed_entity_subset,
Expand All @@ -1949,7 +1953,7 @@ def _should_backfill_atomic_asset_graph_subset_unit(
)
)
passed_subset_value = passed_entity_subset.value
final_failure_subset_values_with_reasons.extend(failure_subset_values_with_reasons)
failure_subset_values_with_reasons.extend(new_failure_subset_values_with_reasons)

passed_entity_subsets = []
for candidate_entity_subset in candidate_entity_subsets:
Expand All @@ -1964,7 +1968,7 @@ def _should_backfill_atomic_asset_graph_subset_unit(
failure_asset_graph_subsets_with_reasons = []
# Any failure partition values apply to all candidate asset keys, so construct a subset
# graph with that partition subset value for each key
for failure_subset_value, reason in final_failure_subset_values_with_reasons:
for failure_subset_value, reason in failure_subset_values_with_reasons:
failure_entity_subsets = [
check.not_none(
asset_graph_view.get_subset_from_serializable_subset(
Expand Down

0 comments on commit d598f82

Please sign in to comment.