Skip to content

Commit

Permalink
[backfill daemon run retries 2/n] backfill daemon incorporates retrie…
Browse files Browse the repository at this point in the history
…s runs when launching new runs (dagster-io#25853)

## Summary & Motivation
The backfill daemon doesn't account for run retries. See
https://github.com/dagster-io/internal/discussions/12460 for more
context. We've decided that we want the daemon to account for automatic
and manual retries of runs that occur while the backfill is still in
progress. This requires two changes: ensuring the backfill isn't marked
completed if there is an in progress run or a failed run that will be
automatically retried; and updating the daemon to take the results of
retried runs into account when deciding what partitions to materialize
in the next iteration.

This PR addresses the second point, updating the backfill daemon to take
the results of retried runs into account when deciding what partitions
to materialize in the next iteration.

Currently the backfill gets a list of the successfully materialized
assets for the backfill by looking at the materialization events for the
asset. It determines which assets failed by looking at the failed runs
launched by the backfill and pulling the asset partition information
from those runs. Any assets downstream of those failed assets will not
be launched by the backfill

Now that we want the backfill daemon to account for run retries we need
to slightly modify this logic. Since a run can be retried it is possible
that an asset can have a successful materialization AND be a failed
asset in a failed run. This means that when we determine which assets
are failed, we need to cross check with the assets that have been
successfully materialized and remove any that are in the materialized
list

## How I Tested These Changes
new unit tests 

manual test 

```python
@dg.asset(
    partitions_def=daily_partitions_def,
)
def has_retry_policy(context):
    if context.run.parent_run_id is None:
        raise Exception("I failed")
    else:
        return 1

@dg.asset(partitions_def=weekly_partitions_def)
def another_weekly(context, has_retry_policy):
    return 1
```

run a backfill of at least 1 week. Can watch the backfill progress and
see that once all of the daily assets retry and succeed, the weekly
asset will get kicked off. This contrasts with the behavior in the PR
down the stack where the daily assets would retry and succeed, but the
weekly asset would never get kicked off

## Changelog

Manual and automatic retries of runs launched by backfills that occur
while the backfill is still in progress are now incorporated into the
backfill's status
  • Loading branch information
jamiedemaria authored and pskinnerthyme committed Dec 16, 2024
1 parent 4e27c6e commit 912f0a0
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 9 deletions.
50 changes: 41 additions & 9 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,16 +1281,29 @@ def get_canceling_asset_backfill_iteration_data(
)

failed_subset = AssetGraphSubset.from_asset_partition_set(
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)),
set(
_get_failed_asset_partitions(
instance_queryer,
backfill_id,
asset_graph,
materialized_subset=updated_materialized_subset,
)
),
asset_graph,
)
# we fetch the failed_subset to get any new assets that have failed and add that to the set of
# assets we already know failed and their downstreams. However we need to remove any assets in
# updated_materialized_subset to account for the case where a run retry successfully
# materialized a previously failed asset.
updated_failed_subset = (
asset_backfill_data.failed_and_downstream_subset | failed_subset
) - updated_materialized_subset
updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
failed_and_downstream_subset=updated_failed_subset,
requested_subset=asset_backfill_data.requested_subset,
backfill_start_time=TimestampWithTimezone(backfill_start_timestamp, "UTC"),
)
Expand Down Expand Up @@ -1353,6 +1366,7 @@ def _get_failed_and_downstream_asset_partitions(
asset_graph: RemoteWorkspaceAssetGraph,
instance_queryer: CachingInstanceQueryer,
backfill_start_timestamp: float,
materialized_subset: AssetGraphSubset,
) -> AssetGraphSubset:
failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set(
asset_graph.bfs_filter_asset_partitions(
Expand All @@ -1364,7 +1378,9 @@ def _get_failed_and_downstream_asset_partitions(
),
"",
),
_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph),
_get_failed_asset_partitions(
instance_queryer, backfill_id, asset_graph, materialized_subset
),
evaluation_time=datetime_from_timestamp(backfill_start_timestamp),
)[0],
asset_graph,
Expand Down Expand Up @@ -1500,6 +1516,7 @@ def execute_asset_backfill_iteration_inner(
asset_graph,
instance_queryer,
backfill_start_timestamp,
updated_materialized_subset,
)

yield None
Expand Down Expand Up @@ -1776,9 +1793,16 @@ def _get_failed_asset_partitions(
instance_queryer: CachingInstanceQueryer,
backfill_id: str,
asset_graph: RemoteAssetGraph,
materialized_subset: AssetGraphSubset,
) -> Sequence[AssetKeyPartitionKey]:
"""Returns asset partitions that materializations were requested for as part of the backfill, but
will not be materialized.
"""Returns asset partitions that materializations were requested for as part of the backfill, but were
not successfully materialized.
This function gets a list of all runs for the backfill that have failed and extracts the asset partitions
that were not materialized from those runs. However, we need to account for retried runs. If a run was
successfully retried, the original failed run will still be processed in this function. So we check the
failed asset partitions against the list of successfully materialized asset partitions. If an asset partition
is in the materialized_subset, it means the failed run was retried and the asset partition was materialized.
Includes canceled asset partitions. Implementation assumes that successful runs won't have any
failed partitions.
Expand Down Expand Up @@ -1811,17 +1835,25 @@ def _get_failed_asset_partitions(
start=run.tags[ASSET_PARTITION_RANGE_START_TAG],
end=run.tags[ASSET_PARTITION_RANGE_END_TAG],
)
asset_partition_candidates = []
for asset_key in failed_asset_keys:
result.extend(
asset_partition_candidates.extend(
asset_graph.get_partitions_in_range(
asset_key, partition_range, instance_queryer
)
)
else:
# a regular backfill run that run on a single partition
partition_key = run.tags.get(PARTITION_NAME_TAG)
result.extend(
asset_partition_candidates = [
AssetKeyPartitionKey(asset_key, partition_key) for asset_key in failed_asset_keys
)
]

asset_partitions_still_failed = [
asset_partition
for asset_partition in asset_partition_candidates
if asset_partition not in materialized_subset
]
result.extend(asset_partitions_still_failed)

return result
202 changes: 202 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,32 @@ def asset_g(asset_f):
pass


@asset(partitions_def=partitions_a)
def fails_once_asset_a(context):
if context.run.parent_run_id is None:
raise Exception("I failed!")


@asset(
partitions_def=partitions_b,
ins={
"fails_once_asset_a": AssetIn(partition_mapping=StaticPartitionMapping({"foo_a": "foo_b"}))
},
)
def downstream_of_fails_once_asset_b(fails_once_asset_a):
pass


@asset(
partitions_def=partitions_c,
ins={
"fails_once_asset_a": AssetIn(partition_mapping=StaticPartitionMapping({"foo_a": "foo_c"}))
},
)
def downstream_of_fails_once_asset_c(fails_once_asset_a):
pass


daily_partitions_def = DailyPartitionsDefinition("2023-01-01")


Expand Down Expand Up @@ -455,6 +481,9 @@ def the_repo():
asset_e,
asset_f,
asset_g,
fails_once_asset_a,
downstream_of_fails_once_asset_b,
downstream_of_fails_once_asset_c,
asset_with_single_run_backfill_policy,
asset_with_multi_run_backfill_policy,
bp_single_run,
Expand Down Expand Up @@ -2958,7 +2987,180 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 6

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_fails_if_retries_fail(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry"), AssetKey("always_fails")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_failed(instance, run, "pass_on_retry")

# since the failed runs should have automatic retries launched for them, the backfill should not
# be considered complete, even though the targeted asset partitions have a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

runs = instance.get_run_records()
list(
consume_new_runs_for_automatic_reexecution(
workspace_process_context=workspace_context, run_records=runs
)
)
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 6

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
# retry limit hasn't been hit, so backfill still in progress
assert backfill.status == BulkActionStatus.REQUESTED

runs = instance.get_run_records()
list(
consume_new_runs_for_automatic_reexecution(
workspace_process_context=workspace_context, run_records=runs
)
)
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 9

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_FAILED


def test_asset_backfill_retries_make_downstreams_runnable(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill_with_downstream"
partition_keys = partitions_a.get_partition_keys()
asset_selection = [
AssetKey("fails_once_asset_a"),
AssetKey("downstream_of_fails_once_asset_b"),
AssetKey("downstream_of_fails_once_asset_c"),
]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 1
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 1
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 1
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert step_failed(instance, run, "fails_once_asset_a")

# if the backfill daemon runs again, we will see that the downstreams are in the failed and downstream subset
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 1
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert (
backfill.asset_backfill_data.materialized_subset.num_partitions_and_non_partitioned_assets
== 0
)
assert (
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 3
)

# launch a retry of the failed run
runs = instance.get_run_records()
list(
consume_new_runs_for_automatic_reexecution(
workspace_process_context=workspace_context, run_records=runs
)
)
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 2

# now that the failed run has been retried, the backfill daemon can launch runs of the downstream assets
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

assert instance.get_runs_count() == 4
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 4
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 4

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert backfill.asset_backfill_data
assert (
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
)

0 comments on commit 912f0a0

Please sign in to comment.