From 48db04c00b9f9e76595675b3526374877cef7374 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Wed, 20 Sep 2023 14:13:59 -0700 Subject: [PATCH] Fix asset backfill duplicate run submission (#16666) Customers reported duplicate runs being kicked off for asset backfills: - https://dagsterlabs.slack.com/archives/C05HE0RG8SU/p1695232916102629 This PR fixes the issue. Situation that causes this issue: 1. Partition X has been requested by the backfill, but is still in progress (has not materialized or failed) 2. Someone launches a manual run for a parent of partition X. 3. After the manual run completes, we [add asset partitions whose parents have been materialized to the candidate set](https://github.com/dagster-io/dagster/blob/claire/fix-dupe-runs/python_modules/dagster/dagster/_core/execution/asset_backfill.py#L1022-L1030), causing partition X to be re-added to the candidate set 3. We don't check that the partition has already been requested before re-requesting it. [We only check that a run of the partition has not been materialized / failed within the backfill](https://github.com/dagster-io/dagster/blob/25f11ba4005a5d0a5f670dc2f0da566d7afc2dd3/python_modules/dagster/dagster/_core/execution/asset_backfill.py#L1135C1-L1139C25). This PR fixes the issue by checking whether a partition has already been requested before re-requesting it. --- .../dagster/_core/execution/asset_backfill.py | 3 + .../execution_tests/test_asset_backfill.py | 83 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 329e65819869b..ba9d1b1549cb7 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1056,6 +1056,7 @@ def execute_asset_backfill_iteration_inner( asset_partitions_to_request=visited, asset_graph=asset_graph, materialized_subset=updated_materialized_subset, + requested_subset=asset_backfill_data.requested_subset, target_subset=asset_backfill_data.target_subset, failed_and_downstream_subset=failed_and_downstream_subset, dynamic_partitions_store=instance_queryer, @@ -1121,6 +1122,7 @@ def should_backfill_atomic_asset_partitions_unit( candidates_unit: Iterable[AssetKeyPartitionKey], asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], target_subset: AssetGraphSubset, + requested_subset: AssetGraphSubset, materialized_subset: AssetGraphSubset, failed_and_downstream_subset: AssetGraphSubset, dynamic_partitions_store: DynamicPartitionsStore, @@ -1135,6 +1137,7 @@ def should_backfill_atomic_asset_partitions_unit( candidate not in target_subset or candidate in failed_and_downstream_subset or candidate in materialized_subset + or candidate in requested_subset ): return False diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 21e7b2fd90515..5446ad957f7ff 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -13,12 +13,14 @@ ) from unittest.mock import MagicMock, patch +import mock import pendulum import pytest from dagster import ( AssetKey, AssetsDefinition, DagsterInstance, + DagsterRunStatus, DailyPartitionsDefinition, Definitions, HourlyPartitionsDefinition, @@ -42,9 +44,12 @@ get_canceling_asset_backfill_iteration_data, ) from dagster._core.host_representation.external_data import external_asset_graph_from_defs +from dagster._core.storage.dagster_run import RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, + BACKFILL_ID_TAG, + PARTITION_NAME_TAG, ) from dagster._core.test_utils import instance_for_test from dagster._seven.compat.pendulum import create_pendulum_time @@ -231,6 +236,19 @@ def _single_backfill_iteration( return backfill_data +def _single_backfill_iteration_create_but_do_not_submit_runs( + backfill_id, backfill_data, asset_graph, instance, assets_by_repo_name +) -> AssetBackfillData: + # Patch the run execution to not actually execute the run, but instead just create it + with mock.patch( + "dagster._core.execution.execute_in_process.ExecuteRunWithPlanIterable", + return_value=mock.MagicMock(), + ): + return _single_backfill_iteration( + backfill_id, backfill_data, asset_graph, instance, assets_by_repo_name + ) + + @pytest.mark.parametrize("some_or_all", ["all", "some"]) @pytest.mark.parametrize("failures", ["no_failures", "root_failures", "random_half_failures"]) @pytest.mark.parametrize("scenario", list(scenarios.values()), ids=list(scenarios.keys())) @@ -310,6 +328,71 @@ def test_materializations_outside_of_backfill(): ) +def test_do_not_rerequest_while_existing_run_in_progress(): + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def upstream(): + pass + + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def downstream(upstream): + pass + + assets_by_repo_name = {"repo": [upstream, downstream]} + asset_graph = get_asset_graph(assets_by_repo_name) + + instance = DagsterInstance.ephemeral() + + backfill_id = "dummy_backfill_id" + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + partition_names=["2023-01-01"], + asset_selection=[downstream.key], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0), + ) + + do_run( + all_assets=[upstream], + asset_keys=[upstream.key], + partition_key="2023-01-01", + instance=instance, + ) + + asset_backfill_data = _single_backfill_iteration_create_but_do_not_submit_runs( + backfill_id, asset_backfill_data, asset_graph, instance, assets_by_repo_name + ) + + assert ( + AssetKeyPartitionKey(downstream.key, partition_key="2023-01-01") + in asset_backfill_data.requested_subset + ) + + # Run for 2023-01-01 exists and is in progress, but has not materialized + backfill_runs = instance.get_runs(RunsFilter(tags={BACKFILL_ID_TAG: backfill_id})) + assert len(backfill_runs) == 1 + assert backfill_runs[0].tags.get(PARTITION_NAME_TAG) == "2023-01-01" + assert backfill_runs[0].status == DagsterRunStatus.NOT_STARTED + + do_run( + all_assets=[upstream], + asset_keys=[upstream.key], + partition_key="2023-01-01", + instance=instance, + ) + + _single_backfill_iteration_create_but_do_not_submit_runs( + backfill_id, asset_backfill_data, asset_graph, instance, assets_by_repo_name + ) + + # Confirm that no additional runs for 2023-01-02 are kicked off + assert len(instance.get_runs(RunsFilter(tags={BACKFILL_ID_TAG: backfill_id}))) == 1 + + def make_backfill_data( some_or_all: str, asset_graph: ExternalAssetGraph,