Skip to content

Commit

Permalink
Fix asset backfill duplicate run submission (#16666)
Browse files Browse the repository at this point in the history
Customers reported duplicate runs being kicked off for asset backfills:
- https://dagsterlabs.slack.com/archives/C05HE0RG8SU/p1695232916102629

This PR fixes the issue. Situation that causes this issue:
1. Partition X has been requested by the backfill, but is still in
progress (has not materialized or failed)
2. Someone launches a manual run for a parent of partition X.
3. After the manual run completes, we [add asset partitions whose
parents have been materialized to the candidate
set](https://github.com/dagster-io/dagster/blob/claire/fix-dupe-runs/python_modules/dagster/dagster/_core/execution/asset_backfill.py#L1022-L1030),
causing partition X to be re-added to the candidate set
3. We don't check that the partition has already been requested before
re-requesting it. [We only check that a run of the partition has not
been materialized / failed within the
backfill](https://github.com/dagster-io/dagster/blob/25f11ba4005a5d0a5f670dc2f0da566d7afc2dd3/python_modules/dagster/dagster/_core/execution/asset_backfill.py#L1135C1-L1139C25).


This PR fixes the issue by checking whether a partition has already been
requested before re-requesting it.
  • Loading branch information
clairelin135 authored Sep 20, 2023
1 parent 25f11ba commit 48db04c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ def execute_asset_backfill_iteration_inner(
asset_partitions_to_request=visited,
asset_graph=asset_graph,
materialized_subset=updated_materialized_subset,
requested_subset=asset_backfill_data.requested_subset,
target_subset=asset_backfill_data.target_subset,
failed_and_downstream_subset=failed_and_downstream_subset,
dynamic_partitions_store=instance_queryer,
Expand Down Expand Up @@ -1121,6 +1122,7 @@ def should_backfill_atomic_asset_partitions_unit(
candidates_unit: Iterable[AssetKeyPartitionKey],
asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey],
target_subset: AssetGraphSubset,
requested_subset: AssetGraphSubset,
materialized_subset: AssetGraphSubset,
failed_and_downstream_subset: AssetGraphSubset,
dynamic_partitions_store: DynamicPartitionsStore,
Expand All @@ -1135,6 +1137,7 @@ def should_backfill_atomic_asset_partitions_unit(
candidate not in target_subset
or candidate in failed_and_downstream_subset
or candidate in materialized_subset
or candidate in requested_subset
):
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
)
from unittest.mock import MagicMock, patch

import mock
import pendulum
import pytest
from dagster import (
AssetKey,
AssetsDefinition,
DagsterInstance,
DagsterRunStatus,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
Expand All @@ -42,9 +44,12 @@
get_canceling_asset_backfill_iteration_data,
)
from dagster._core.host_representation.external_data import external_asset_graph_from_defs
from dagster._core.storage.dagster_run import RunsFilter
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
PARTITION_NAME_TAG,
)
from dagster._core.test_utils import instance_for_test
from dagster._seven.compat.pendulum import create_pendulum_time
Expand Down Expand Up @@ -231,6 +236,19 @@ def _single_backfill_iteration(
return backfill_data


def _single_backfill_iteration_create_but_do_not_submit_runs(
backfill_id, backfill_data, asset_graph, instance, assets_by_repo_name
) -> AssetBackfillData:
# Patch the run execution to not actually execute the run, but instead just create it
with mock.patch(
"dagster._core.execution.execute_in_process.ExecuteRunWithPlanIterable",
return_value=mock.MagicMock(),
):
return _single_backfill_iteration(
backfill_id, backfill_data, asset_graph, instance, assets_by_repo_name
)


@pytest.mark.parametrize("some_or_all", ["all", "some"])
@pytest.mark.parametrize("failures", ["no_failures", "root_failures", "random_half_failures"])
@pytest.mark.parametrize("scenario", list(scenarios.values()), ids=list(scenarios.keys()))
Expand Down Expand Up @@ -310,6 +328,71 @@ def test_materializations_outside_of_backfill():
)


def test_do_not_rerequest_while_existing_run_in_progress():
@asset(
partitions_def=DailyPartitionsDefinition("2023-01-01"),
)
def upstream():
pass

@asset(
partitions_def=DailyPartitionsDefinition("2023-01-01"),
)
def downstream(upstream):
pass

assets_by_repo_name = {"repo": [upstream, downstream]}
asset_graph = get_asset_graph(assets_by_repo_name)

instance = DagsterInstance.ephemeral()

backfill_id = "dummy_backfill_id"
asset_backfill_data = AssetBackfillData.from_asset_partitions(
asset_graph=asset_graph,
partition_names=["2023-01-01"],
asset_selection=[downstream.key],
dynamic_partitions_store=MagicMock(),
all_partitions=False,
backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0),
)

do_run(
all_assets=[upstream],
asset_keys=[upstream.key],
partition_key="2023-01-01",
instance=instance,
)

asset_backfill_data = _single_backfill_iteration_create_but_do_not_submit_runs(
backfill_id, asset_backfill_data, asset_graph, instance, assets_by_repo_name
)

assert (
AssetKeyPartitionKey(downstream.key, partition_key="2023-01-01")
in asset_backfill_data.requested_subset
)

# Run for 2023-01-01 exists and is in progress, but has not materialized
backfill_runs = instance.get_runs(RunsFilter(tags={BACKFILL_ID_TAG: backfill_id}))
assert len(backfill_runs) == 1
assert backfill_runs[0].tags.get(PARTITION_NAME_TAG) == "2023-01-01"
assert backfill_runs[0].status == DagsterRunStatus.NOT_STARTED

do_run(
all_assets=[upstream],
asset_keys=[upstream.key],
partition_key="2023-01-01",
instance=instance,
)

_single_backfill_iteration_create_but_do_not_submit_runs(
backfill_id, asset_backfill_data, asset_graph, instance, assets_by_repo_name
)

# Confirm that no additional runs for 2023-01-02 are kicked off
assert len(instance.get_runs(RunsFilter(tags={BACKFILL_ID_TAG: backfill_id}))) == 1


def make_backfill_data(
some_or_all: str,
asset_graph: ExternalAssetGraph,
Expand Down

0 comments on commit 48db04c

Please sign in to comment.