Skip to content

Commit

Permalink
[asset backfill] Misc cancellation iteration improvements (#18951)
Browse files Browse the repository at this point in the history
This PR updates the cancellation iteration to stop doing BFS. This
guards against slow behavior occurring during BFS.

During cancellation, we no longer need to update downstreams to be in a
"failed" state as they are unrequested.
  • Loading branch information
clairelin135 authored Jan 27, 2024
1 parent 8ebc19d commit 4a9a085
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
11 changes: 4 additions & 7 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,19 +1055,16 @@ def get_canceling_asset_backfill_iteration_data(
" AssetGraphSubset object"
)

failed_and_downstream_subset = _get_failed_and_downstream_asset_partitions(
backfill_id,
asset_backfill_data,
asset_graph,
instance_queryer,
backfill_start_time,
failed_subset = AssetGraphSubset.from_asset_partition_set(
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), asset_graph
)
updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=failed_and_downstream_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
requested_subset=asset_backfill_data.requested_subset,
backfill_start_time=backfill_start_time,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,83 @@ def downstream_daily_partitioned_asset(
)


def test_asset_backfill_cancels_without_fetching_downstreams_of_failed_partitions():
instance = DagsterInstance.ephemeral()

@asset(partitions_def=HourlyPartitionsDefinition("2023-01-01-00:00"))
def upstream_hourly_partitioned_asset():
raise Exception("noo")

@asset(partitions_def=DailyPartitionsDefinition("2023-01-01"))
def downstream_daily_partitioned_asset(
upstream_hourly_partitioned_asset,
):
return upstream_hourly_partitioned_asset + 1

assets_by_repo_name = {
"repo": [
upstream_hourly_partitioned_asset,
downstream_daily_partitioned_asset,
]
}
asset_graph = get_asset_graph(assets_by_repo_name)

backfill_id = "dummy_backfill_id"
backfill_start_time = pendulum.datetime(2023, 1, 10, 0, 0, 0)
asset_selection = [
upstream_hourly_partitioned_asset.key,
downstream_daily_partitioned_asset.key,
]
targeted_partitions = ["2023-01-09-00:00"]

asset_backfill_data = AssetBackfillData.from_asset_partitions(
asset_graph=asset_graph,
partition_names=targeted_partitions,
asset_selection=asset_selection,
dynamic_partitions_store=MagicMock(),
all_partitions=False,
backfill_start_time=backfill_start_time,
)

for _ in range(2):
# One iteration to submit a run targeting the partition
# Second iteration to update the asset backfill data
asset_backfill_data = _single_backfill_iteration(
backfill_id, asset_backfill_data, asset_graph, instance, assets_by_repo_name
)

assert (
AssetKeyPartitionKey(upstream_hourly_partitioned_asset.key, "2023-01-09-00:00")
in asset_backfill_data.failed_and_downstream_subset
)
assert (
AssetKeyPartitionKey(downstream_daily_partitioned_asset.key, "2023-01-09")
in asset_backfill_data.failed_and_downstream_subset
)

instance_queryer = CachingInstanceQueryer(instance, asset_graph, backfill_start_time)

canceling_backfill_data = None
for canceling_backfill_data in get_canceling_asset_backfill_iteration_data(
backfill_id,
asset_backfill_data,
instance_queryer,
asset_graph,
backfill_start_time,
):
pass

assert isinstance(canceling_backfill_data, AssetBackfillData)
assert (
AssetKeyPartitionKey(upstream_hourly_partitioned_asset.key, "2023-01-09-00:00")
in canceling_backfill_data.failed_and_downstream_subset
)
assert (
AssetKeyPartitionKey(downstream_daily_partitioned_asset.key, "2023-01-09")
in canceling_backfill_data.failed_and_downstream_subset
)


def test_asset_backfill_target_asset_and_same_partitioning_grandchild():
instance = DagsterInstance.ephemeral()

Expand Down

0 comments on commit 4a9a085

Please sign in to comment.