Skip to content

Commit

Permalink
renames
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Nov 25, 2024
1 parent 1d8ed7e commit 52e0373
Showing 1 changed file with 62 additions and 66 deletions.
128 changes: 62 additions & 66 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ def _get_subset_in_target_subset(

candidate_entity_subset = next(iter(candidate_entity_subsets))

filtered_entity_subset = candidate_entity_subset.compute_intersection(
passed_entity_subset = candidate_entity_subset.compute_intersection(
check.not_none(
asset_graph_view.get_subset_from_serializable_subset(
target_subset.get_asset_subset(
Expand All @@ -1320,7 +1320,7 @@ def _get_subset_in_target_subset(
)
)

return AssetGraphSubset.from_entity_subsets([filtered_entity_subset])
return AssetGraphSubset.from_entity_subsets([passed_entity_subset])


def _get_failed_and_downstream_asset_partitions(
Expand Down Expand Up @@ -1609,7 +1609,9 @@ def _should_backfill_atomic_asset_subset_unit(
asset_graph = asset_graph_view.asset_graph
asset_key = entity_subset_to_filter.key

missing_in_target_partitions = entity_subset_to_filter.compute_difference(
passed_entity_subset = entity_subset_to_filter

missing_in_target_partitions = passed_entity_subset.compute_difference(
check.not_none(
asset_graph_view.get_asset_subset_from_asset_graph_subset(target_subset, asset_key)
)
Expand All @@ -1621,11 +1623,9 @@ def _should_backfill_atomic_asset_subset_unit(
f"{missing_in_target_partitions} not targeted by backfill",
)
)
entity_subset_to_filter = entity_subset_to_filter.compute_difference(
missing_in_target_partitions
)
passed_entity_subset = passed_entity_subset.compute_difference(missing_in_target_partitions)

failed_and_downstream_partitions = entity_subset_to_filter.compute_intersection(
failed_and_downstream_partitions = passed_entity_subset.compute_intersection(
check.not_none(
asset_graph_view.get_asset_subset_from_asset_graph_subset(
failed_and_downstream_subset, asset_key
Expand All @@ -1639,11 +1639,11 @@ def _should_backfill_atomic_asset_subset_unit(
f"{failed_and_downstream_partitions} has failed or is downstream of a failed asset",
)
)
entity_subset_to_filter = entity_subset_to_filter.compute_difference(
passed_entity_subset = passed_entity_subset.compute_difference(
failed_and_downstream_partitions
)

materialized_partitions = entity_subset_to_filter.compute_intersection(
materialized_partitions = passed_entity_subset.compute_intersection(
check.not_none(
asset_graph_view.get_asset_subset_from_asset_graph_subset(
materialized_subset, asset_key
Expand All @@ -1657,11 +1657,9 @@ def _should_backfill_atomic_asset_subset_unit(
f"{materialized_partitions} already materialized by backfill",
)
)
entity_subset_to_filter = entity_subset_to_filter.compute_difference(
materialized_partitions
)
passed_entity_subset = passed_entity_subset.compute_difference(materialized_partitions)

requested_partitions = entity_subset_to_filter.compute_intersection(
requested_partitions = passed_entity_subset.compute_intersection(
check.not_none(
asset_graph_view.get_asset_subset_from_asset_graph_subset(requested_subset, asset_key)
)
Expand All @@ -1674,27 +1672,27 @@ def _should_backfill_atomic_asset_subset_unit(
f"{requested_partitions} already requested by backfill",
)
)
entity_subset_to_filter = entity_subset_to_filter.compute_difference(requested_partitions)
passed_entity_subset = passed_entity_subset.compute_difference(requested_partitions)

for parent_key in asset_graph.get(asset_key).parent_keys:
if entity_subset_to_filter.is_empty:
if passed_entity_subset.is_empty:
break

parent_subset, required_but_nonexistent_subset = (
asset_graph_view.compute_parent_subset_and_required_but_nonexistent_subset(
parent_key,
entity_subset_to_filter,
passed_entity_subset,
)
)

if not required_but_nonexistent_subset.is_empty:
raise DagsterInvariantViolationError(
f"Asset partition subset {entity_subset_to_filter}"
f"Asset partition subset {passed_entity_subset}"
f" depends on invalid partitions {required_but_nonexistent_subset}"
)

# Children with parents that are targeted but not materialized are eligible
# to be filtered out if the parent
# to be filtered out if the parent has not run yet
targeted_but_not_materialized_parent_subset: EntitySubset[AssetKey] = (
parent_subset.compute_intersection(
check.not_none(
Expand All @@ -1711,21 +1709,21 @@ def _should_backfill_atomic_asset_subset_unit(
)
)

can_be_removed_due_to_parent_subset = (
possibly_waiting_for_parent_subset = (
asset_graph_view.compute_child_subset(
asset_key, targeted_but_not_materialized_parent_subset
)
).compute_intersection(entity_subset_to_filter)
).compute_intersection(passed_entity_subset)

cannot_be_removed_due_to_parent_subset = entity_subset_to_filter.compute_difference(
can_be_removed_due_to_parent_subset
not_waiting_for_parent_subset = passed_entity_subset.compute_difference(
possibly_waiting_for_parent_subset
)

if not can_be_removed_due_to_parent_subset.is_empty:
filtered_out_subset, parent_failure_subsets_with_reasons = (
if not possibly_waiting_for_parent_subset.is_empty:
can_run_with_parent_subset, parent_failure_subsets_with_reasons = (
get_can_run_with_parent_subsets(
targeted_but_not_materialized_parent_subset,
can_be_removed_due_to_parent_subset,
possibly_waiting_for_parent_subset,
asset_graph_view,
target_subset,
asset_graph_subset_matched_so_far,
Expand All @@ -1735,12 +1733,12 @@ def _should_backfill_atomic_asset_subset_unit(
if parent_failure_subsets_with_reasons:
failure_subsets_with_reasons.extend(parent_failure_subsets_with_reasons)

entity_subset_to_filter = cannot_be_removed_due_to_parent_subset.compute_union(
filtered_out_subset
passed_entity_subset = not_waiting_for_parent_subset.compute_union(
can_run_with_parent_subset
)

return (
entity_subset_to_filter.convert_to_serializable_subset(),
passed_entity_subset.convert_to_serializable_subset(),
failure_subsets_with_reasons,
)

Expand All @@ -1756,6 +1754,8 @@ def get_can_run_with_parent_subsets(
candidate_asset_key = entity_subset_to_filter.key
parent_asset_key = parent_subset.key

passed_entity_subset = entity_subset_to_filter

assert isinstance(asset_graph_view.asset_graph, RemoteWorkspaceAssetGraph)
asset_graph: RemoteWorkspaceAssetGraph = asset_graph_view.asset_graph

Expand Down Expand Up @@ -1784,7 +1784,7 @@ def get_can_run_with_parent_subsets(
asset_graph_view.get_empty_subset(key=candidate_asset_key),
[
(
entity_subset_to_filter.get_internal_value(),
passed_entity_subset.get_internal_value(),
f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.",
)
],
Expand All @@ -1797,7 +1797,7 @@ def get_can_run_with_parent_subsets(
asset_graph_view.get_empty_subset(key=candidate_asset_key),
[
(
entity_subset_to_filter.get_internal_value(),
passed_entity_subset.get_internal_value(),
f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.",
)
],
Expand All @@ -1808,7 +1808,7 @@ def get_can_run_with_parent_subsets(
asset_graph_view.get_empty_subset(key=candidate_asset_key),
[
(
entity_subset_to_filter.get_internal_value(),
passed_entity_subset.get_internal_value(),
f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different partitions definitions so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.",
)
],
Expand All @@ -1817,15 +1817,15 @@ def get_can_run_with_parent_subsets(
parent_target_subset = target_subset.get_asset_subset(parent_asset_key, asset_graph)
candidate_target_subset = target_subset.get_asset_subset(candidate_asset_key, asset_graph)

parent_requested_subset = check.not_none(
parent_already_requested_subset = check.not_none(
asset_graph_view.get_asset_subset_from_asset_graph_subset(
asset_graph_subset_matched_so_far, parent_asset_key
)
)

num_requested_parents = parent_requested_subset.size
num_requested_parents = parent_already_requested_subset.size

if (
if not (
# if there is a simple mapping between the parent and the child, then
# with the parent
has_identity_partition_mapping
Expand All @@ -1846,8 +1846,6 @@ def get_can_run_with_parent_subsets(
and num_requested_parents == parent_target_subset.size
)
):
pass
else:
failed_reason = (
f"partition mapping between {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} is not simple and "
f"{parent_node.key.to_user_string()} does not meet requirements of: targeting the same partitions as "
Expand All @@ -1859,13 +1857,13 @@ def get_can_run_with_parent_subsets(
asset_graph_view.get_empty_subset(key=candidate_asset_key),
[
(
entity_subset_to_filter.get_internal_value(),
passed_entity_subset.get_internal_value(),
failed_reason,
)
],
)

# Split out the partitions to remove any that were not requested
# Split out the partitions to remove any that were not yet requested
failure_subsets_with_reasons = []

candidate_subset = check.not_none(
Expand All @@ -1875,29 +1873,29 @@ def get_can_run_with_parent_subsets(
)

# Filter out any children of parents that weren't requested in this iteration (by
# being in either the requested subset, or more rarely in the candidate subset if
# they are part of a non-subsettable multi-asset)
unrequested_parent_subset = parent_subset.compute_difference(
parent_requested_subset.compute_union(candidate_subset)
# being in either the asset_graph_subset_matched_so_far subset, or more rarely in
# the candidate subset if they are part of a non-subsettable multi-asset)
not_yet_requested_parent_subset = parent_subset.compute_difference(
parent_already_requested_subset.compute_union(candidate_subset)
)

children_of_unrequested_parents = asset_graph_view.compute_child_subset(
candidate_asset_key, unrequested_parent_subset
).compute_intersection(entity_subset_to_filter)
children_of_not_yet_requested_parents = asset_graph_view.compute_child_subset(
candidate_asset_key, not_yet_requested_parent_subset
).compute_intersection(passed_entity_subset)

if not children_of_unrequested_parents.is_empty:
if not children_of_not_yet_requested_parents.is_empty:
failure_subsets_with_reasons.append(
(
children_of_unrequested_parents.convert_to_serializable_subset().value,
f"Parent subset {unrequested_parent_subset} is not requested in this iteration",
children_of_not_yet_requested_parents.convert_to_serializable_subset().value,
f"Parent subset {not_yet_requested_parent_subset} is not requested in this iteration",
)
)
entity_subset_to_filter = entity_subset_to_filter.compute_difference(
children_of_unrequested_parents
passed_entity_subset = passed_entity_subset.compute_difference(
children_of_not_yet_requested_parents
)

return (
entity_subset_to_filter,
passed_entity_subset,
failure_subsets_with_reasons,
)

Expand All @@ -1919,31 +1917,29 @@ def _should_backfill_atomic_asset_graph_subset_unit(

# this value is the same for all passed in asset keys since they are always part of the same
# execution set
filtered_subset_value = candidate_entity_subsets[0].get_internal_value()
passed_subset_value = candidate_entity_subsets[0].get_internal_value()

candidate_asset_keys = [
candidate_entity_subset.key for candidate_entity_subset in candidate_entity_subsets
]

for candidate_asset_key in candidate_asset_keys:
# filter down the set of matching values for each asset key
filtered_serializable_entity_subset = SerializableEntitySubset(
passed_serializable_entity_subset = SerializableEntitySubset(
candidate_asset_key,
filtered_subset_value,
passed_subset_value,
)
filtered_entity_subset = check.not_none(
asset_graph_view.get_subset_from_serializable_subset(
filtered_serializable_entity_subset
)
passed_entity_subset = check.not_none(
asset_graph_view.get_subset_from_serializable_subset(passed_serializable_entity_subset)
)

if filtered_entity_subset.is_empty:
if passed_entity_subset.is_empty:
break

filtered_entity_subset, failure_subset_values_with_reasons = (
passed_entity_subset, failure_subset_values_with_reasons = (
_should_backfill_atomic_asset_subset_unit(
asset_graph_view,
entity_subset_to_filter=filtered_entity_subset,
entity_subset_to_filter=passed_entity_subset,
candidate_asset_graph_subset_unit=candidate_asset_graph_subset_unit,
asset_graph_subset_matched_so_far=asset_graph_subset_matched_so_far,
target_subset=target_subset,
Expand All @@ -1952,15 +1948,15 @@ def _should_backfill_atomic_asset_graph_subset_unit(
failed_and_downstream_subset=failed_and_downstream_subset,
)
)
filtered_subset_value = filtered_entity_subset.value
passed_subset_value = passed_entity_subset.value
final_failure_subset_values_with_reasons.extend(failure_subset_values_with_reasons)

filtered_entity_subsets = []
passed_entity_subsets = []
for candidate_entity_subset in candidate_entity_subsets:
filtered_entity_subsets.append(
passed_entity_subsets.append(
check.not_none(
asset_graph_view.get_subset_from_serializable_subset(
SerializableEntitySubset(candidate_entity_subset.key, filtered_subset_value)
SerializableEntitySubset(candidate_entity_subset.key, passed_subset_value)
)
)
)
Expand All @@ -1987,7 +1983,7 @@ def _should_backfill_atomic_asset_graph_subset_unit(
)

return AssetGraphViewBfsFilterConditionResult(
passed_asset_graph_subset=AssetGraphSubset.from_entity_subsets(filtered_entity_subsets),
passed_asset_graph_subset=AssetGraphSubset.from_entity_subsets(passed_entity_subsets),
excluded_asset_graph_subsets_and_reasons=failure_asset_graph_subsets_with_reasons,
)

Expand Down

0 comments on commit 52e0373

Please sign in to comment.