From 912f0a0e7bedfdc396cdd27c48ebe030b58f0651 Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Thu, 5 Dec 2024 13:36:41 -0500 Subject: [PATCH] [backfill daemon run retries 2/n] backfill daemon incorporates retries runs when launching new runs (#25853) ## Summary & Motivation The backfill daemon doesn't account for run retries. See https://github.com/dagster-io/internal/discussions/12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. This PR addresses the second point, updating the backfill daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. Currently the backfill gets a list of the successfully materialized assets for the backfill by looking at the materialization events for the asset. It determines which assets failed by looking at the failed runs launched by the backfill and pulling the asset partition information from those runs. Any assets downstream of those failed assets will not be launched by the backfill Now that we want the backfill daemon to account for run retries we need to slightly modify this logic. Since a run can be retried it is possible that an asset can have a successful materialization AND be a failed asset in a failed run. This means that when we determine which assets are failed, we need to cross check with the assets that have been successfully materialized and remove any that are in the materialized list ## How I Tested These Changes new unit tests manual test ```python @dg.asset( partitions_def=daily_partitions_def, ) def has_retry_policy(context): if context.run.parent_run_id is None: raise Exception("I failed") else: return 1 @dg.asset(partitions_def=weekly_partitions_def) def another_weekly(context, has_retry_policy): return 1 ``` run a backfill of at least 1 week. Can watch the backfill progress and see that once all of the daily assets retry and succeed, the weekly asset will get kicked off. This contrasts with the behavior in the PR down the stack where the daily assets would retry and succeed, but the weekly asset would never get kicked off ## Changelog Manual and automatic retries of runs launched by backfills that occur while the backfill is still in progress are now incorporated into the backfill's status --- .../dagster/_core/execution/asset_backfill.py | 50 ++++- .../daemon_tests/test_backfill.py | 202 ++++++++++++++++++ 2 files changed, 243 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index c8aa4b22336aa..b9e4cdc18094f 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1281,16 +1281,29 @@ def get_canceling_asset_backfill_iteration_data( ) failed_subset = AssetGraphSubset.from_asset_partition_set( - set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), + set( + _get_failed_asset_partitions( + instance_queryer, + backfill_id, + asset_graph, + materialized_subset=updated_materialized_subset, + ) + ), asset_graph, ) + # we fetch the failed_subset to get any new assets that have failed and add that to the set of + # assets we already know failed and their downstreams. However we need to remove any assets in + # updated_materialized_subset to account for the case where a run retry successfully + # materialized a previously failed asset. + updated_failed_subset = ( + asset_backfill_data.failed_and_downstream_subset | failed_subset + ) - updated_materialized_subset updated_backfill_data = AssetBackfillData( target_subset=asset_backfill_data.target_subset, latest_storage_id=asset_backfill_data.latest_storage_id, requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, materialized_subset=updated_materialized_subset, - failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset - | failed_subset, + failed_and_downstream_subset=updated_failed_subset, requested_subset=asset_backfill_data.requested_subset, backfill_start_time=TimestampWithTimezone(backfill_start_timestamp, "UTC"), ) @@ -1353,6 +1366,7 @@ def _get_failed_and_downstream_asset_partitions( asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, + materialized_subset: AssetGraphSubset, ) -> AssetGraphSubset: failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( asset_graph.bfs_filter_asset_partitions( @@ -1364,7 +1378,9 @@ def _get_failed_and_downstream_asset_partitions( ), "", ), - _get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph), + _get_failed_asset_partitions( + instance_queryer, backfill_id, asset_graph, materialized_subset + ), evaluation_time=datetime_from_timestamp(backfill_start_timestamp), )[0], asset_graph, @@ -1500,6 +1516,7 @@ def execute_asset_backfill_iteration_inner( asset_graph, instance_queryer, backfill_start_timestamp, + updated_materialized_subset, ) yield None @@ -1776,9 +1793,16 @@ def _get_failed_asset_partitions( instance_queryer: CachingInstanceQueryer, backfill_id: str, asset_graph: RemoteAssetGraph, + materialized_subset: AssetGraphSubset, ) -> Sequence[AssetKeyPartitionKey]: - """Returns asset partitions that materializations were requested for as part of the backfill, but - will not be materialized. + """Returns asset partitions that materializations were requested for as part of the backfill, but were + not successfully materialized. + + This function gets a list of all runs for the backfill that have failed and extracts the asset partitions + that were not materialized from those runs. However, we need to account for retried runs. If a run was + successfully retried, the original failed run will still be processed in this function. So we check the + failed asset partitions against the list of successfully materialized asset partitions. If an asset partition + is in the materialized_subset, it means the failed run was retried and the asset partition was materialized. Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. @@ -1811,8 +1835,9 @@ def _get_failed_asset_partitions( start=run.tags[ASSET_PARTITION_RANGE_START_TAG], end=run.tags[ASSET_PARTITION_RANGE_END_TAG], ) + asset_partition_candidates = [] for asset_key in failed_asset_keys: - result.extend( + asset_partition_candidates.extend( asset_graph.get_partitions_in_range( asset_key, partition_range, instance_queryer ) @@ -1820,8 +1845,15 @@ def _get_failed_asset_partitions( else: # a regular backfill run that run on a single partition partition_key = run.tags.get(PARTITION_NAME_TAG) - result.extend( + asset_partition_candidates = [ AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys - ) + ] + + asset_partitions_still_failed = [ + asset_partition + for asset_partition in asset_partition_candidates + if asset_partition not in materialized_subset + ] + result.extend(asset_partitions_still_failed) return result diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7b659ef0c6442..50077a20ce36e 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -355,6 +355,32 @@ def asset_g(asset_f): pass +@asset(partitions_def=partitions_a) +def fails_once_asset_a(context): + if context.run.parent_run_id is None: + raise Exception("I failed!") + + +@asset( + partitions_def=partitions_b, + ins={ + "fails_once_asset_a": AssetIn(partition_mapping=StaticPartitionMapping({"foo_a": "foo_b"})) + }, +) +def downstream_of_fails_once_asset_b(fails_once_asset_a): + pass + + +@asset( + partitions_def=partitions_c, + ins={ + "fails_once_asset_a": AssetIn(partition_mapping=StaticPartitionMapping({"foo_a": "foo_c"})) + }, +) +def downstream_of_fails_once_asset_c(fails_once_asset_a): + pass + + daily_partitions_def = DailyPartitionsDefinition("2023-01-01") @@ -455,6 +481,9 @@ def the_repo(): asset_e, asset_f, asset_g, + fails_once_asset_a, + downstream_of_fails_once_asset_b, + downstream_of_fails_once_asset_c, asset_with_single_run_backfill_policy, asset_with_multi_run_backfill_policy, bp_single_run, @@ -2958,7 +2987,180 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( wait_for_all_runs_to_finish(instance, timeout=30) assert instance.get_runs_count() == 6 + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_fails_if_retries_fail( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry"), AssetKey("always_fails")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_failed(instance, run, "pass_on_retry") + + # since the failed runs should have automatic retries launched for them, the backfill should not + # be considered complete, even though the targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + + runs = instance.get_run_records() + list( + consume_new_runs_for_automatic_reexecution( + workspace_process_context=workspace_context, run_records=runs + ) + ) + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 6 + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + # retry limit hasn't been hit, so backfill still in progress + assert backfill.status == BulkActionStatus.REQUESTED + + runs = instance.get_run_records() + list( + consume_new_runs_for_automatic_reexecution( + workspace_process_context=workspace_context, run_records=runs + ) + ) + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 9 + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_FAILED + + +def test_asset_backfill_retries_make_downstreams_runnable( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill_with_downstream" + partition_keys = partitions_a.get_partition_keys() + asset_selection = [ + AssetKey("fails_once_asset_a"), + AssetKey("downstream_of_fails_once_asset_b"), + AssetKey("downstream_of_fails_once_asset_c"), + ] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 1 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 1 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 1 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert step_failed(instance, run, "fails_once_asset_a") + + # if the backfill daemon runs again, we will see that the downstreams are in the failed and downstream subset + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 1 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert ( + backfill.asset_backfill_data.materialized_subset.num_partitions_and_non_partitioned_assets + == 0 + ) + assert ( + backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets + == 3 + ) + + # launch a retry of the failed run + runs = instance.get_run_records() + list( + consume_new_runs_for_automatic_reexecution( + workspace_process_context=workspace_context, run_records=runs + ) + ) + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 2 + + # now that the failed run has been retried, the backfill daemon can launch runs of the downstream assets + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + assert instance.get_runs_count() == 4 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 4 + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 4 + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + assert backfill.asset_backfill_data + assert ( + backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets + == 0 + )