Skip to content

Commit

Permalink
[asset backfills] Better handling when backfill forcibly canceled (#1…
Browse files Browse the repository at this point in the history
…9418)

Currently when backfills are forcibly marked as canceled some surprising
issues arise:
1. The backfill is marked as failed with a `Unexpected backfill status`
error because we don't anticipate evaluating backfills in a `CANCELED`
state.
2. If the backfill is currently being evaluated by the canceling
iteration, a race condition occurs that causes the `CANCELED` request to
be ignored.

This PR fixes these issues.
  • Loading branch information
clairelin135 authored Jan 26, 2024
1 parent 333be87 commit 8ebc19d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,8 @@ def execute_asset_backfill_iteration(
"Expected get_canceling_asset_backfill_iteration_data to return a PartitionBackfill"
)

# Refetch, in case the backfill was forcibly marked as canceled in the meantime
backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id))
updated_backfill = backfill.with_asset_backfill_data(
updated_asset_backfill_data,
dynamic_partitions_store=instance,
Expand Down Expand Up @@ -1024,6 +1026,9 @@ def execute_asset_backfill_iteration(
logger.debug(
f"Updated asset backfill data after cancellation iteration: {updated_asset_backfill_data}"
)
elif backfill.status == BulkActionStatus.CANCELING:
# The backfill was forcibly canceled, skip iteration
pass
else:
check.failed(f"Unexpected backfill status: {backfill.status}")

Expand Down
66 changes: 66 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,72 @@ def _override_backfill_cancellation(backfill: PartitionBackfill):
assert instance.get_runs_count(RunsFilter(statuses=IN_PROGRESS_RUN_STATUSES)) == 0


def test_asset_backfill_forcible_mark_as_canceled_during_canceling_iteration(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
asset_selection = [AssetKey("daily_1"), AssetKey("daily_2")]
asset_graph = ExternalAssetGraph.from_workspace(workspace_context.create_request_context())

backfill_id = "backfill_id"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=["2023-01-01"],
dynamic_partitions_store=instance,
all_partitions=False,
).with_status(BulkActionStatus.CANCELING)
instance.add_backfill(
# Add some partitions in a "requested" state to mock that certain partitions are hanging
backfill.with_asset_backfill_data(
backfill.asset_backfill_data._replace(
requested_subset=AssetGraphSubset(non_partitioned_asset_keys={AssetKey("daily_1")})
),
dynamic_partitions_store=instance,
asset_graph=asset_graph,
)
)
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.CANCELING

override_get_backfill_num_calls = 0

def _override_get_backfill(_):
nonlocal override_get_backfill_num_calls
if override_get_backfill_num_calls == 1:
# Mark backfill as canceled during the middle of the cancellation iteration
override_get_backfill_num_calls += 1
return backfill.with_status(BulkActionStatus.CANCELED)
else:
override_get_backfill_num_calls += 1
return backfill

# After submitting the first chunk, update the backfill to be CANCELING
with mock.patch(
"dagster._core.instance.DagsterInstance.get_backfill",
side_effect=_override_get_backfill,
):
# Mock that a run is still in progress. If we don't add this, then the backfill will be
# marked as failed
with mock.patch("dagster._core.instance.DagsterInstance.get_run_ids", side_effect=["fake"]):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
# Assert that the backfill was indeed marked as canceled
assert updated_backfill.status == BulkActionStatus.CANCELED


def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
Expand Down

0 comments on commit 8ebc19d

Please sign in to comment.