From 8ebc19d9c3cb92b106630c581731bedfa86beabc Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 26 Jan 2024 15:13:10 -0800 Subject: [PATCH] [asset backfills] Better handling when backfill forcibly canceled (#19418) Currently when backfills are forcibly marked as canceled some surprising issues arise: 1. The backfill is marked as failed with a `Unexpected backfill status` error because we don't anticipate evaluating backfills in a `CANCELED` state. 2. If the backfill is currently being evaluated by the canceling iteration, a race condition occurs that causes the `CANCELED` request to be ignored. This PR fixes these issues. --- .../dagster/_core/execution/asset_backfill.py | 5 ++ .../daemon_tests/test_backfill.py | 66 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 89f490c7378cf..17ebaffe8e9ed 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -989,6 +989,8 @@ def execute_asset_backfill_iteration( "Expected get_canceling_asset_backfill_iteration_data to return a PartitionBackfill" ) + # Refetch, in case the backfill was forcibly marked as canceled in the meantime + backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id)) updated_backfill = backfill.with_asset_backfill_data( updated_asset_backfill_data, dynamic_partitions_store=instance, @@ -1024,6 +1026,9 @@ def execute_asset_backfill_iteration( logger.debug( f"Updated asset backfill data after cancellation iteration: {updated_asset_backfill_data}" ) + elif backfill.status == BulkActionStatus.CANCELING: + # The backfill was forcibly canceled, skip iteration + pass else: check.failed(f"Unexpected backfill status: {backfill.status}") diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index c05c88f7d80a1..f984f29cc96ba 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -1150,6 +1150,72 @@ def _override_backfill_cancellation(backfill: PartitionBackfill): assert instance.get_runs_count(RunsFilter(statuses=IN_PROGRESS_RUN_STATUSES)) == 0 +def test_asset_backfill_forcible_mark_as_canceled_during_canceling_iteration( + instance: DagsterInstance, workspace_context: WorkspaceProcessContext +): + asset_selection = [AssetKey("daily_1"), AssetKey("daily_2")] + asset_graph = ExternalAssetGraph.from_workspace(workspace_context.create_request_context()) + + backfill_id = "backfill_id" + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id=backfill_id, + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=["2023-01-01"], + dynamic_partitions_store=instance, + all_partitions=False, + ).with_status(BulkActionStatus.CANCELING) + instance.add_backfill( + # Add some partitions in a "requested" state to mock that certain partitions are hanging + backfill.with_asset_backfill_data( + backfill.asset_backfill_data._replace( + requested_subset=AssetGraphSubset(non_partitioned_asset_keys={AssetKey("daily_1")}) + ), + dynamic_partitions_store=instance, + asset_graph=asset_graph, + ) + ) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.CANCELING + + override_get_backfill_num_calls = 0 + + def _override_get_backfill(_): + nonlocal override_get_backfill_num_calls + if override_get_backfill_num_calls == 1: + # Mark backfill as canceled during the middle of the cancellation iteration + override_get_backfill_num_calls += 1 + return backfill.with_status(BulkActionStatus.CANCELED) + else: + override_get_backfill_num_calls += 1 + return backfill + + # After submitting the first chunk, update the backfill to be CANCELING + with mock.patch( + "dagster._core.instance.DagsterInstance.get_backfill", + side_effect=_override_get_backfill, + ): + # Mock that a run is still in progress. If we don't add this, then the backfill will be + # marked as failed + with mock.patch("dagster._core.instance.DagsterInstance.get_run_ids", side_effect=["fake"]): + assert all( + not error + for error in list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + ) + + updated_backfill = instance.get_backfill(backfill_id) + assert updated_backfill + # Assert that the backfill was indeed marked as canceled + assert updated_backfill.status == BulkActionStatus.CANCELED + + def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ):