diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 1eea13085353f..72918fc3d9d11 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3729,6 +3729,7 @@ type Mutation { ): ReportRunlessAssetEventsResult! launchPartitionBackfill(backfillParams: LaunchBackfillParams!): LaunchBackfillResult! resumePartitionBackfill(backfillId: String!): ResumeBackfillResult! + retryPartitionBackfill(backfillId: String!): LaunchBackfillResult! cancelPartitionBackfill(backfillId: String!): CancelBackfillResult! logTelemetry( action: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 234f635c8fddc..ce925d8e87e3f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2627,6 +2627,7 @@ export type Mutation = { resetSchedule: ScheduleMutationResult; resetSensor: SensorOrError; resumePartitionBackfill: ResumeBackfillResult; + retryPartitionBackfill: LaunchBackfillResult; scheduleDryRun: ScheduleDryRunResult; sensorDryRun: SensorDryRunResult; setAutoMaterializePaused: Scalars['Boolean']['output']; @@ -2730,6 +2731,10 @@ export type MutationResumePartitionBackfillArgs = { backfillId: Scalars['String']['input']; }; +export type MutationRetryPartitionBackfillArgs = { + backfillId: Scalars['String']['input']; +}; + export type MutationScheduleDryRunArgs = { selectorData: ScheduleSelector; timestamp?: InputMaybe; @@ -10249,6 +10254,12 @@ export const buildMutation = ( : relationshipsToOmit.has('PythonError') ? ({} as PythonError) : buildPythonError({}, relationshipsToOmit), + retryPartitionBackfill: + overrides && overrides.hasOwnProperty('retryPartitionBackfill') + ? overrides.retryPartitionBackfill! + : relationshipsToOmit.has('ConflictingExecutionParamsError') + ? ({} as ConflictingExecutionParamsError) + : buildConflictingExecutionParamsError({}, relationshipsToOmit), scheduleDryRun: overrides && overrides.hasOwnProperty('scheduleDryRun') ? overrides.scheduleDryRun! diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 7a92b52e902f1..81aa13d4e2363 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -42,7 +42,9 @@ from dagster_graphql.implementation.execution.backfill import ( cancel_partition_backfill as cancel_partition_backfill, create_and_launch_partition_backfill as create_and_launch_partition_backfill, + reexecute_partition_backfill as reexecute_partition_backfill, resume_partition_backfill as resume_partition_backfill, + retry_partition_backfill as retry_partition_backfill, ) from dagster_graphql.implementation.utils import assert_permission, assert_permission_for_location diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 95fbb112da65b..ee157f55a93ab 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -10,9 +10,14 @@ ) from dagster._core.events import AssetKey from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import ( + BULK_ACTION_TERMINAL_STATUSES, + BulkActionStatus, + PartitionBackfill, +) from dagster._core.execution.job_backfill import submit_backfill_runs from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap +from dagster._core.storage.tags import PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG from dagster._core.utils import make_new_backfill_id from dagster._core.workspace.permissions import Permissions from dagster._time import datetime_from_timestamp, get_current_timestamp @@ -34,6 +39,7 @@ GrapheneCancelBackfillSuccess, GrapheneLaunchBackfillSuccess, GrapheneResumeBackfillSuccess, + GrapheneRetryBackfillSuccess, ) from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError from dagster_graphql.schema.util import ResolveInfo @@ -322,3 +328,142 @@ def resume_partition_backfill( graphene_info.context.instance.update_backfill(backfill.with_status(BulkActionStatus.REQUESTED)) return GrapheneResumeBackfillSuccess(backfill_id=backfill_id) + + +def retry_partition_backfill( + graphene_info: "ResolveInfo", backfill_id: str +) -> "GrapheneRetryBackfillSuccess": + from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess + + backfill = graphene_info.context.instance.get_backfill(backfill_id) + if not backfill: + check.failed(f"No backfill found for id: {backfill_id}") + + if backfill.status not in BULK_ACTION_TERMINAL_STATUSES: + raise DagsterInvariantViolationError( + f"Cannot retry backfill {backfill_id} because it is still in progress." + ) + + if backfill.is_asset_backfill: + asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph) + # determine the subset that should be retried by removing the successfully materialized subset from + # the target subset. This ensures that if the backfill was canceled or marked failed that all + # non-materialized asset partitions will be retried. asset_backfill_data.failed_and_downstream_asset + # only contains asset partitions who's materialization runs failed and their downsteam assets, not + # asset partitions that never got materialization runs. + not_materialized_assets = ( + asset_backfill_data.target_subset - asset_backfill_data.materialized_subset + ) + if not_materialized_assets.num_partitions_and_non_partitioned_assets == 0: + raise DagsterInvariantViolationError( + "Cannot retry an asset backfill that has no missing materializations." + ) + asset_graph = graphene_info.context.asset_graph + assert_permission_for_asset_graph( + graphene_info, + asset_graph, + list(not_materialized_assets.asset_keys), + Permissions.LAUNCH_PARTITION_BACKFILL, + ) + + new_backfill = PartitionBackfill.from_asset_graph_subset( + backfill_id=make_new_backfill_id(), + asset_graph_subset=not_materialized_assets, + dynamic_partitions_store=graphene_info.context.instance, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + title=f"Retry of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + else: # job backfill + partition_set_origin = check.not_none(backfill.partition_set_origin) + location_name = partition_set_origin.selector.location_name + assert_permission_for_location( + graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name + ) + + new_backfill = PartitionBackfill( + backfill_id=make_new_backfill_id(), + partition_set_origin=backfill.partition_set_origin, + status=BulkActionStatus.REQUESTED, + partition_names=backfill.partition_names, + from_failure=True, + reexecution_steps=backfill.reexecution_steps, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + asset_selection=backfill.asset_selection, + title=f"Retry of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + + graphene_info.context.instance.add_backfill(new_backfill) + return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id) + + +def reexecute_partition_backfill( + graphene_info: "ResolveInfo", backfill_id: str +) -> "GrapheneRetryBackfillSuccess": + from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess + + backfill = graphene_info.context.instance.get_backfill(backfill_id) + if not backfill: + check.failed(f"No backfill found for id: {backfill_id}") + + if backfill.is_asset_backfill: + asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph) + asset_graph = graphene_info.context.asset_graph + assert_permission_for_asset_graph( + graphene_info, + asset_graph, + list(asset_backfill_data.target_subset.asset_keys), + Permissions.LAUNCH_PARTITION_BACKFILL, + ) + + new_backfill = PartitionBackfill.from_asset_graph_subset( + backfill_id=make_new_backfill_id(), + asset_graph_subset=asset_backfill_data.target_subset, + dynamic_partitions_store=graphene_info.context.instance, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + title=f"Re-execution of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + else: # job backfill + partition_set_origin = check.not_none(backfill.partition_set_origin) + location_name = partition_set_origin.selector.location_name + assert_permission_for_location( + graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name + ) + + new_backfill = PartitionBackfill( + backfill_id=make_new_backfill_id(), + partition_set_origin=backfill.partition_set_origin, + status=BulkActionStatus.REQUESTED, + partition_names=backfill.partition_names, + from_failure=False, + reexecution_steps=backfill.reexecution_steps, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + asset_selection=backfill.asset_selection, + title=f"Re-execution of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + + graphene_info.context.instance.add_backfill(new_backfill) + return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 22ce110942a16..f04803a0c2bb8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -19,7 +19,9 @@ from dagster_graphql.implementation.execution.backfill import ( cancel_partition_backfill, create_and_launch_partition_backfill, + reexecute_partition_backfill, resume_partition_backfill, + retry_partition_backfill, ) from dagster_graphql.implementation.execution.dynamic_partitions import ( add_dynamic_partition, @@ -350,7 +352,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): class GrapheneResumeBackfillMutation(graphene.Mutation): - """Retries a set of partition backfill runs.""" + """Resumes a set of partition backfill runs. Resuming a backfill will not retry any failed runs.""" Output = graphene.NonNull(GrapheneResumeBackfillResult) @@ -366,6 +368,40 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): return resume_partition_backfill(graphene_info, backfillId) +class GrapheneRetryBackfillMutation(graphene.Mutation): + """Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions.""" + + Output = graphene.NonNull(GrapheneLaunchBackfillResult) + + class Arguments: + backfillId = graphene.NonNull(graphene.String) + + class Meta: + name = "RetryBackfillMutation" + + @capture_error + @require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL) + def mutate(self, graphene_info: ResolveInfo, backfillId: str): + return retry_partition_backfill(graphene_info, backfillId) + + +class GrapheneReexecuteBackfillMutation(graphene.Mutation): + """Re-executes a backfill. Re-executing a backfill will create a new backfill that targets the same partitions as the existing backfill.""" + + Output = graphene.NonNull(GrapheneLaunchBackfillResult) + + class Arguments: + backfillId = graphene.NonNull(graphene.String) + + class Meta: + name = "RetryBackfillMutation" + + @capture_error + @require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL) + def mutate(self, graphene_info: ResolveInfo, backfillId: str): + return reexecute_partition_backfill(graphene_info, backfillId) + + class GrapheneAddDynamicPartitionMutation(graphene.Mutation): """Adds a partition to a dynamic partition set.""" @@ -981,6 +1017,7 @@ class Meta: reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field() launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field() resumePartitionBackfill = GrapheneResumeBackfillMutation.Field() + retryPartitionBackfill = GrapheneRetryBackfillMutation.Field() cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field() logTelemetry = GrapheneLogTelemetryMutation.Field() setNuxSeen = GrapheneSetNuxSeenMutation.Field() diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 932ea2e3cd698..18f441b1f488c 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -25,7 +25,9 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + PARENT_BACKFILL_ID_TAG, PARTITION_NAME_TAG, + ROOT_BACKFILL_ID_TAG, ) from dagster._core.test_utils import create_run_for_test, environ from dagster._core.utils import make_new_backfill_id @@ -160,6 +162,22 @@ } """ +RETRY_BACKFILL_MUTATION = """ + mutation($backfillId: String!) { + retryPartitionBackfill(backfillId: $backfillId) { + __typename + ... on RetryBackfillSuccess { + backfillId + } + ... on PythonError { + message + stack + } + } + } +""" + + GET_PARTITION_BACKFILLS_QUERY = """ query PartitionBackfillsQuery($repositorySelector: RepositorySelector!, $partitionSetName: String!) { partitionSetOrError(repositorySelector: $repositorySelector, partitionSetName: $partitionSetName) { @@ -558,6 +576,65 @@ def test_cancel_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "CANCELED" + def test_cancel_then_retry_backfill(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": { + "repositorySelector": repository_selector, + "partitionSetName": "integers_partition_set", + }, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + result = execute_dagster_graphql( + graphql_context, + CANCEL_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + assert result.data + assert result.data["cancelPartitionBackfill"]["__typename"] == "CancelBackfillSuccess" + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + result = execute_dagster_graphql( + graphql_context, + PARTITION_PROGRESS_QUERY, + variables={"backfillId": retried_backfill_id}, + ) + assert not result.errors + assert result.data + assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" + assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" + assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 + assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 + assert result.data["partitionBackfillOrError"]["fromFailure"] + def test_cancel_asset_backfill(self, graphql_context): asset_key = AssetKey("hanging_partition_asset") partitions = ["a"] @@ -634,6 +711,107 @@ def test_cancel_asset_backfill(self, graphql_context): assert len(runs) == 1 assert runs[0].status == DagsterRunStatus.CANCELED + def test_cancel_then_retry_asset_backfill(self, graphql_context): + asset_key = AssetKey("hanging_partition_asset") + partitions = ["a"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input()], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # Update asset backfill data to contain requested partition, but does not execute side effects, + # since launching the run will cause test process will hang forever. + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # Launch the run that runs forever + selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") + with safe_tempfile_path() as path: + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PIPELINE_EXECUTION_MUTATION, + variables={ + "executionParams": { + "selector": selector, + "mode": "default", + "runConfigData": { + "resources": {"hanging_asset_resource": {"config": {"file": path}}} + }, + "executionMetadata": { + "tags": [ + {"key": "dagster/partition", "value": "a"}, + {"key": BACKFILL_ID_TAG, "value": backfill_id}, + ] + }, + } + }, + ) + + assert not result.errors + assert result.data + + # ensure the execution has happened + while not os.path.exists(path): + time.sleep(0.1) + + result = execute_dagster_graphql( + graphql_context, + CANCEL_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + assert result.data + assert result.data["cancelPartitionBackfill"]["__typename"] == "CancelBackfillSuccess" + + while ( + graphql_context.instance.get_backfill(backfill_id).status + != BulkActionStatus.CANCELED + ): + _execute_backfill_iteration_with_side_effects(graphql_context, backfill_id) + + runs = graphql_context.instance.get_runs( + RunsFilter(tags={BACKFILL_ID_TAG: backfill_id}) + ) + assert len(runs) == 1 + assert runs[0].status == DagsterRunStatus.CANCELED + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + # no runs were successful for the first backfill, so the retry target should be the same + # as the first backfill + assert ( + first_backfill.asset_backfill_data.target_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + def test_resume_backfill(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( @@ -1352,3 +1530,591 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" assert result.data["partitionBackfillOrError"]["numCancelable"] == 10 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 + + def test_retry_asset_backfill(self, graphql_context): + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as completed so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_retry_successful_asset_backfill(self, graphql_context): + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as complete so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert ( + "Cannot retry an asset backfill that has no missing materializations" + in result.data["retryPartitionBackfill"]["message"] + ) + + def test_retry_asset_backfill_still_in_progress(self, graphql_context): + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # try to retry the backfill while it is still in progress + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert "still in progress" in result.data["retryPartitionBackfill"]["message"] + + # once teh first backfill is in a completed state, we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.FAILED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_retry_asset_backfill_twice(self, graphql_context): + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as completed so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + _execute_asset_backfill_iteration_no_side_effects( + graphql_context, retried_backfill.backfill_id, asset_graph + ) + # mark some partitions failed so we can retry again + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + retried_backfill.backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects( + graphql_context, retried_backfill.backfill_id, asset_graph + ) + + # refetch the backfill to get the updated statuses of all assets + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + graphql_context.instance.update_backfill( + retried_backfill.with_status(BulkActionStatus.COMPLETED) + ) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": retried_backfill.backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + seconrd_retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + second_retried_backfill = graphql_context.instance.get_backfill(seconrd_retry_backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + retried_backfill.asset_backfill_data.failed_and_downstream_subset + == second_retried_backfill.asset_backfill_data.target_subset + ) + assert second_retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == retry_backfill_id + assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_retry_job_backfill(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + try: + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + result = execute_dagster_graphql( + graphql_context, + PARTITION_PROGRESS_QUERY, + variables={"backfillId": retried_backfill_id}, + ) + assert not result.errors + assert result.data + assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" + assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" + assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 + assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 + assert result.data["partitionBackfillOrError"]["fromFailure"] + + def test_retry_in_progress_job_backfill(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + try: + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert "still in progress" in result.data["retryPartitionBackfill"]["message"] + + # mark backfill as complete and confirm that we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_retry_job_backfill_twice(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # mark backfill as complete and confirm that we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + try: + with open(output_file, "w", encoding="utf8"): + # re-execute from failure (still with the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + graphql_context.instance.update_backfill( + retried_backfill.with_status(BulkActionStatus.COMPLETED) + ) + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": retried_backfill.backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + second_retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + second_retried_backfill = graphql_context.instance.get_backfill(second_retried_backfill_id) + + assert second_retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == retried_backfill_id + assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_retry_successful_job_backfill(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # mark backfill as complete + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + # Unlike asset backfills, we don't currently have a way to see if a job backfill has runs that + # failed without querying the DB. So we always allow retries + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 019fb211be142..2eac210ece544 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -42,6 +42,16 @@ def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) +BULK_ACTION_TERMINAL_STATUSES = [ + BulkActionStatus.COMPLETED, + BulkActionStatus.FAILED, + BulkActionStatus.CANCELED, + BulkActionStatus.COMPLETED_SUCCESS, + BulkActionStatus.COMPLETED_SUCCESS, + BulkActionStatus.COMPLETED_FAILED, +] + + @record class BulkActionsFilter: """Filters to use when querying for bulk actions (i.e. backfills) from the BulkActionsTable. diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index 2f767ad0d2987..02027e0e910be 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -33,8 +33,12 @@ PARENT_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}parent_run_id" +PARENT_BACKFILL_ID_TAG = f"{SYSTEM_TAG_PREFIX}parent_backfill_id" + ROOT_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}root_run_id" +ROOT_BACKFILL_ID_TAG = f"{SYSTEM_TAG_PREFIX}root_backfill_id" + RESUME_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}is_resume_retry" STEP_SELECTION_TAG = f"{SYSTEM_TAG_PREFIX}step_selection"