diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index faadb38006a6a..fb42ffe3b8c2a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -71,11 +71,13 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, + BACKFILL_ID_TAG, + BACKFILL_TAGS, PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG, - TAGS_TO_OMIT_ON_RETRY, + TAGS_TO_MAYBE_OMIT_ON_RETRY, WILL_RETRY_TAG, ) from dagster._serdes import ConfigurableClass @@ -1633,6 +1635,7 @@ def create_reexecuted_run( run_config: Optional[Mapping[str, Any]] = None, use_parent_run_tags: bool = False, ) -> DagsterRun: + from dagster._core.execution.backfill import BulkActionStatus from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.execution.plan.state import KnownExecutionState from dagster._core.remote_representation import CodeLocation, RemoteJob @@ -1650,15 +1653,27 @@ def create_reexecuted_run( parent_run_id = parent_run.run_id # these can differ from remote_job.tags if tags were added at launch time - parent_run_tags = ( - {key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY} - if use_parent_run_tags - else {} - ) + parent_run_tags_to_include = {} + if use_parent_run_tags: + parent_run_tags_to_include = { + key: val + for key, val in parent_run.tags.items() + if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY + } + # condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, + # ROOT_BACKFILL_ID_TAG on retried run + if parent_run.tags.get(BACKFILL_ID_TAG) is not None: + # if the run was part of a backfill and the backfill is complete, we do not want the + # retry to be considered part of the backfill, so remove all backfill-related tags + backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) + if backfill and backfill.status == BulkActionStatus.REQUESTED: + for tag in BACKFILL_TAGS: + if parent_run.tags.get(tag) is not None: + parent_run_tags_to_include[tag] = parent_run.tags[tag] tags = merge_dicts( remote_job.tags, - parent_run_tags, + parent_run_tags_to_include, extra_tags or {}, { PARENT_RUN_ID_TAG: parent_run_id, diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index c510becec9772..7c22457b39ee5 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -108,12 +108,15 @@ RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval" RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics" +BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG} -TAGS_TO_OMIT_ON_RETRY = { + +TAGS_TO_MAYBE_OMIT_ON_RETRY = { *RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, AUTO_RETRY_RUN_ID_TAG, + *BACKFILL_TAGS, } diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index edb4acfd36dbe..712318128e4d0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex yield workspace_context -@pytest.fixture(name="remote_repo", scope="module") -def remote_repo_fixture( +@pytest.fixture(name="code_location", scope="module") +def code_location_fixture( workspace_context: WorkspaceProcessContext, -) -> Iterator[RemoteRepository]: - yield cast( +) -> CodeLocation: + return cast( CodeLocation, next( iter(workspace_context.create_request_context().get_code_location_entries().values()) ).code_location, - ).get_repository("the_repo") + ) + + +@pytest.fixture(name="remote_repo", scope="module") +def remote_repo_fixture( + code_location: CodeLocation, +) -> Iterator[RemoteRepository]: + yield code_location.get_repository("the_repo") def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 50077a20ce36e..934a84fe825c8 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -41,6 +41,7 @@ from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig from dagster._core.definitions.selector import ( + JobSubsetSelector, PartitionRangeSelector, PartitionsByAssetSelector, PartitionsSelector, @@ -53,11 +54,13 @@ get_asset_backfill_run_chunk_size, ) from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.remote_representation import ( InProcessCodeLocationOrigin, RemoteRepository, RemoteRepositoryOrigin, ) +from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -69,6 +72,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + BACKFILL_TAGS, MAX_RETRIES_TAG, PARTITION_NAME_TAG, ) @@ -3164,3 +3168,85 @@ def test_asset_backfill_retries_make_downstreams_runnable( backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 ) + + +def test_run_retry_not_part_of_completed_backfill( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + code_location: CodeLocation, + remote_repo: RemoteRepository, +): + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + selector = JobSubsetSelector( + location_name=code_location.name, + repository_name=remote_repo.name, + job_name=run_to_retry.job_name, + asset_selection=run_to_retry.asset_selection, + op_selection=None, + ) + remote_job = code_location.get_job(selector) + retried_run = instance.create_reexecuted_run( + parent_run=run_to_retry, + code_location=code_location, + remote_job=remote_job, + strategy=ReexecutionStrategy.ALL_STEPS, + run_config=run_to_retry.run_config, + use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested + ) + + for tag in BACKFILL_TAGS: + assert tag not in retried_run.tags.keys() + + # Since the backfill is alerady complete, it should not be processed by the backfill daemon and + # should remain in a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + assert retried_run.run_id not in [ + r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) + ]