From 6d3ee7deda38aa06c244ebd889474224da6d4d94 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 11:22:27 -0400 Subject: [PATCH 01/11] implementation --- .../implementation/execution/__init__.py | 1 + .../implementation/execution/backfill.py | 54 +++++++++++++++++++ .../dagster_graphql/schema/backfill.py | 13 +++++ .../dagster_graphql/schema/roots/mutation.py | 22 +++++++- 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 7a92b52e902f1..cd81a7308413c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -43,6 +43,7 @@ cancel_partition_backfill as cancel_partition_backfill, create_and_launch_partition_backfill as create_and_launch_partition_backfill, resume_partition_backfill as resume_partition_backfill, + retry_partition_backfill as retry_partition_backfill, ) from dagster_graphql.implementation.utils import assert_permission, assert_permission_for_location diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 95fbb112da65b..bbf547a3a6786 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -13,6 +13,7 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.execution.job_backfill import submit_backfill_runs from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap +from dagster._core.storage.tags import PARENT_RUN_ID_TAG, ROOT_RUN_ID_TAG from dagster._core.utils import make_new_backfill_id from dagster._core.workspace.permissions import Permissions from dagster._time import datetime_from_timestamp, get_current_timestamp @@ -34,6 +35,7 @@ GrapheneCancelBackfillSuccess, GrapheneLaunchBackfillSuccess, GrapheneResumeBackfillSuccess, + GrapheneRetryBackfillSuccess, ) from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError from dagster_graphql.schema.util import ResolveInfo @@ -322,3 +324,55 @@ def resume_partition_backfill( graphene_info.context.instance.update_backfill(backfill.with_status(BulkActionStatus.REQUESTED)) return GrapheneResumeBackfillSuccess(backfill_id=backfill_id) + + +def retry_partition_backfill( + graphene_info: "ResolveInfo", backfill_id: str +) -> "GrapheneRetryBackfillSuccess": + from ...schema.backfill import GrapheneRetryBackfillSuccess + + backfill = graphene_info.context.instance.get_backfill(backfill_id) + if not backfill: + check.failed(f"No backfill found for id: {backfill_id}") + + partition_set_origin = check.not_none(backfill.partition_set_origin) + location_name = partition_set_origin.selector.location_name + assert_permission_for_location( + graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name + ) + + if backfill.is_asset_backfill: + new_backfill = PartitionBackfill.from_asset_graph_subset( + backfill_id=make_new_backfill_id(), + asset_graph_subset=backfill.asset_backfill_data.failed_and_downstream_subset, + dynamic_partitions_store=graphene_info.context.instance, + tags={ + **backfill.tags, + PARENT_RUN_ID_TAG: backfill.backfill_id, + ROOT_RUN_ID_TAG: backfill.tags.get(PARENT_RUN_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + title=f"Retry of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + else: + new_backfill = PartitionBackfill( + backfill_id=backfill.backfill_id, + partition_set_origin=backfill.partition_set_origin, + status=BulkActionStatus.REQUESTED, + partition_names=backfill.partition_names, + from_failure=True, + reexecution_steps=backfill.reexecution_steps, + tags={ + **backfill.tags, + PARENT_RUN_ID_TAG: backfill.backfill_id, + ROOT_RUN_ID_TAG: backfill.tags.get(PARENT_RUN_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + asset_selection=backfill.asset_selection, + title=f"Retry of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + + graphene_info.context.instance.add_backfill(new_backfill) + return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index bfc2101e583f2..e3202962e1e45 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -107,12 +107,25 @@ class Meta: name = "ResumeBackfillSuccess" +class GrapheneRetryBackfillSuccess(graphene.ObjectType): + backfill_id = graphene.NonNull(graphene.String) + + class Meta: + name = "RetryBackfillSuccess" + + class GrapheneResumeBackfillResult(graphene.Union): class Meta: types = (GrapheneResumeBackfillSuccess, GrapheneUnauthorizedError, GraphenePythonError) name = "ResumeBackfillResult" +class GrapheneRetryBackfillResult(graphene.Union): + class Meta: + types = (GrapheneRetryBackfillSuccess, GrapheneUnauthorizedError, GraphenePythonError) + name = "RetryBackfillResult" + + class GrapheneBulkActionStatus(graphene.Enum): REQUESTED = "REQUESTED" COMPLETED = "COMPLETED" diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 22ce110942a16..b0088980aecf1 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -20,6 +20,7 @@ cancel_partition_backfill, create_and_launch_partition_backfill, resume_partition_backfill, + retry_partition_backfill, ) from dagster_graphql.implementation.execution.dynamic_partitions import ( add_dynamic_partition, @@ -49,6 +50,7 @@ GrapheneCancelBackfillResult, GrapheneLaunchBackfillResult, GrapheneResumeBackfillResult, + GrapheneRetryBackfillResult, ) from dagster_graphql.schema.errors import ( GrapheneAssetNotFoundError, @@ -350,7 +352,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): class GrapheneResumeBackfillMutation(graphene.Mutation): - """Retries a set of partition backfill runs.""" + """Resumes a set of partition backfill runs. Resuming a backfill will not retry any failed runs.""" Output = graphene.NonNull(GrapheneResumeBackfillResult) @@ -366,6 +368,23 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): return resume_partition_backfill(graphene_info, backfillId) +class GrapheneRetryBackfillMutation(graphene.Mutation): + """Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions.""" + + Output = graphene.NonNull(GrapheneRetryBackfillResult) + + class Arguments: + backfillId = graphene.NonNull(graphene.String) + + class Meta: + name = "RetryBackfillMutation" + + @capture_error + @require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL) + def mutate(self, graphene_info: ResolveInfo, backfillId: str): + return retry_partition_backfill(graphene_info, backfillId) + + class GrapheneAddDynamicPartitionMutation(graphene.Mutation): """Adds a partition to a dynamic partition set.""" @@ -981,6 +1000,7 @@ class Meta: reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field() launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field() resumePartitionBackfill = GrapheneResumeBackfillMutation.Field() + retryPartitionBackfill = GrapheneRetryBackfillMutation.Field() cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field() logTelemetry = GrapheneLogTelemetryMutation.Field() setNuxSeen = GrapheneSetNuxSeenMutation.Field() From 43ba35fd46adc57fcc77a1e160365e545d538338 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 12:55:27 -0400 Subject: [PATCH 02/11] testing --- .../ui-core/src/graphql/schema.graphql | 7 + .../packages/ui-core/src/graphql/types.ts | 30 ++ .../implementation/execution/backfill.py | 48 +- .../graphql/test_partition_backfill.py | 424 ++++++++++++++++++ .../dagster/_core/execution/backfill.py | 5 + .../dagster/dagster/_core/storage/tags.py | 4 + 6 files changed, 506 insertions(+), 12 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 1eea13085353f..681e149b7f89c 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3729,6 +3729,7 @@ type Mutation { ): ReportRunlessAssetEventsResult! launchPartitionBackfill(backfillParams: LaunchBackfillParams!): LaunchBackfillResult! resumePartitionBackfill(backfillId: String!): ResumeBackfillResult! + retryPartitionBackfill(backfillId: String!): RetryBackfillResult! cancelPartitionBackfill(backfillId: String!): CancelBackfillResult! logTelemetry( action: String! @@ -3810,6 +3811,12 @@ type ResumeBackfillSuccess { backfillId: String! } +union RetryBackfillResult = RetryBackfillSuccess | UnauthorizedError | PythonError + +type RetryBackfillSuccess { + backfillId: String! +} + union CancelBackfillResult = CancelBackfillSuccess | UnauthorizedError | PythonError type CancelBackfillSuccess { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 234f635c8fddc..26045012695f7 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2627,6 +2627,7 @@ export type Mutation = { resetSchedule: ScheduleMutationResult; resetSensor: SensorOrError; resumePartitionBackfill: ResumeBackfillResult; + retryPartitionBackfill: RetryBackfillResult; scheduleDryRun: ScheduleDryRunResult; sensorDryRun: SensorDryRunResult; setAutoMaterializePaused: Scalars['Boolean']['output']; @@ -2730,6 +2731,10 @@ export type MutationResumePartitionBackfillArgs = { backfillId: Scalars['String']['input']; }; +export type MutationRetryPartitionBackfillArgs = { + backfillId: Scalars['String']['input']; +}; + export type MutationScheduleDryRunArgs = { selectorData: ScheduleSelector; timestamp?: InputMaybe; @@ -4418,6 +4423,13 @@ export type ResumeBackfillSuccess = { backfillId: Scalars['String']['output']; }; +export type RetryBackfillResult = PythonError | RetryBackfillSuccess | UnauthorizedError; + +export type RetryBackfillSuccess = { + __typename: 'RetryBackfillSuccess'; + backfillId: Scalars['String']['output']; +}; + export type Run = PipelineRun & RunsFeedEntry & { __typename: 'Run'; @@ -10249,6 +10261,12 @@ export const buildMutation = ( : relationshipsToOmit.has('PythonError') ? ({} as PythonError) : buildPythonError({}, relationshipsToOmit), + retryPartitionBackfill: + overrides && overrides.hasOwnProperty('retryPartitionBackfill') + ? overrides.retryPartitionBackfill! + : relationshipsToOmit.has('PythonError') + ? ({} as PythonError) + : buildPythonError({}, relationshipsToOmit), scheduleDryRun: overrides && overrides.hasOwnProperty('scheduleDryRun') ? overrides.scheduleDryRun! @@ -12927,6 +12945,18 @@ export const buildResumeBackfillSuccess = ( }; }; +export const buildRetryBackfillSuccess = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'RetryBackfillSuccess'} & RetryBackfillSuccess => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('RetryBackfillSuccess'); + return { + __typename: 'RetryBackfillSuccess', + backfillId: overrides && overrides.hasOwnProperty('backfillId') ? overrides.backfillId! : 'at', + }; +}; + export const buildRun = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index bbf547a3a6786..1b5d99ef7d4b7 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -10,10 +10,14 @@ ) from dagster._core.events import AssetKey from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import ( + BULK_ACTION_COMPLETED_STATUSES, + BulkActionStatus, + PartitionBackfill, +) from dagster._core.execution.job_backfill import submit_backfill_runs from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap -from dagster._core.storage.tags import PARENT_RUN_ID_TAG, ROOT_RUN_ID_TAG +from dagster._core.storage.tags import PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG from dagster._core.utils import make_new_backfill_id from dagster._core.workspace.permissions import Permissions from dagster._time import datetime_from_timestamp, get_current_timestamp @@ -335,29 +339,49 @@ def retry_partition_backfill( if not backfill: check.failed(f"No backfill found for id: {backfill_id}") - partition_set_origin = check.not_none(backfill.partition_set_origin) - location_name = partition_set_origin.selector.location_name - assert_permission_for_location( - graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name - ) + if backfill.status not in BULK_ACTION_COMPLETED_STATUSES: + raise DagsterInvariantViolationError( + f"Cannot retry backfill {backfill_id} because it is still in progress." + ) if backfill.is_asset_backfill: + if ( + backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets + == 0 + ): + raise DagsterInvariantViolationError( + "Cannot retry an asset backfill that has no failed assets." + ) + asset_graph = graphene_info.context.asset_graph + assert_permission_for_asset_graph( + graphene_info, + asset_graph, + backfill.asset_backfill_data.failed_and_downstream_subset.asset_keys, + Permissions.LAUNCH_PARTITION_BACKFILL, + ) + new_backfill = PartitionBackfill.from_asset_graph_subset( backfill_id=make_new_backfill_id(), asset_graph_subset=backfill.asset_backfill_data.failed_and_downstream_subset, dynamic_partitions_store=graphene_info.context.instance, tags={ **backfill.tags, - PARENT_RUN_ID_TAG: backfill.backfill_id, - ROOT_RUN_ID_TAG: backfill.tags.get(PARENT_RUN_ID_TAG, backfill.backfill_id), + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(PARENT_BACKFILL_ID_TAG, backfill.backfill_id), }, backfill_timestamp=get_current_timestamp(), title=f"Retry of {backfill.title}" if backfill.title else None, description=backfill.description, ) else: + partition_set_origin = check.not_none(backfill.partition_set_origin) + location_name = partition_set_origin.selector.location_name + assert_permission_for_location( + graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name + ) + new_backfill = PartitionBackfill( - backfill_id=backfill.backfill_id, + backfill_id=make_new_backfill_id(), partition_set_origin=backfill.partition_set_origin, status=BulkActionStatus.REQUESTED, partition_names=backfill.partition_names, @@ -365,8 +389,8 @@ def retry_partition_backfill( reexecution_steps=backfill.reexecution_steps, tags={ **backfill.tags, - PARENT_RUN_ID_TAG: backfill.backfill_id, - ROOT_RUN_ID_TAG: backfill.tags.get(PARENT_RUN_ID_TAG, backfill.backfill_id), + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(PARENT_BACKFILL_ID_TAG, backfill.backfill_id), }, backfill_timestamp=get_current_timestamp(), asset_selection=backfill.asset_selection, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 932ea2e3cd698..4291aaf5804ae 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -25,7 +25,9 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, + ROOT_RUN_ID_TAG, ) from dagster._core.test_utils import create_run_for_test, environ from dagster._core.utils import make_new_backfill_id @@ -160,6 +162,22 @@ } """ +RETRY_BACKFILL_MUTATION = """ + mutation($backfillId: String!) { + retryPartitionBackfill(backfillId: $backfillId) { + __typename + ... on RetryBackfillSuccess { + backfillId + } + ... on PythonError { + message + stack + } + } + } +""" + + GET_PARTITION_BACKFILLS_QUERY = """ query PartitionBackfillsQuery($repositorySelector: RepositorySelector!, $partitionSetName: String!) { partitionSetOrError(repositorySelector: $repositorySelector, partitionSetName: $partitionSetName) { @@ -1352,3 +1370,409 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" assert result.data["partitionBackfillOrError"]["numCancelable"] == 10 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 + + def test_retry_asset_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as completed so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + def test_retry_successful_asset_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_successful_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as complete so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert ( + "Cannot retry an asset backfill that has no failed assets" + in result.data["retryPartitionBackfill"]["message"] + ) + + def test_retry_asset_backfill_still_in_progress(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill_still_in_progress[sqlite_with_default_run_launcher_managed_grpc_env] + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # try to retry the backfill while it is still in progress + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert "still in progress" in result.data["retryPartitionBackfill"]["message"] + + # once teh first backfill is in a completed state, we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.FAILED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + def test_retry_asset_backfill_twice(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill_twice[sqlite_with_default_run_launcher_managed_grpc_env] + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + + asset_keys = [ + AssetKey("unpartitioned_upstream_of_partitioned"), + AssetKey("upstream_daily_partitioned_asset"), + AssetKey("downstream_weekly_partitioned_asset"), + ] + partitions = ["2023-01-09"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input() for asset_key in asset_keys], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("unpartitioned_upstream_of_partitioned"), + asset_graph, + backfill_id, + DagsterRunStatus.SUCCESS, + None, + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # mark backfill as completed so we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + assert ( + first_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + # mark some partitions failed so we can retry again + _mock_asset_backfill_runs( + graphql_context, + AssetKey("upstream_daily_partitioned_asset"), + asset_graph, + backfill_id, + DagsterRunStatus.FAILURE, + "2023-01-09", + ) + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + graphql_context.instance.update_backfill( + retried_backfill.with_status(BulkActionStatus.FAILED) + ) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": retried_backfill.backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + seconrd_retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + second_retried_backfill = graphql_context.instance.get_backfill(seconrd_retry_backfill_id) + + assert ( + second_retried_backfill.asset_backfill_data.failed_and_downstream_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retry_backfill_id + assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + def test_retry_job_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + try: + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + result = execute_dagster_graphql( + graphql_context, + PARTITION_PROGRESS_QUERY, + variables={"backfillId": retried_backfill_id}, + ) + assert not result.errors + assert result.data + assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" + assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" + assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 + assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 + assert result.data["partitionBackfillOrError"]["fromFailure"] diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 019fb211be142..98d9c119df625 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -41,6 +41,11 @@ class BulkActionStatus(Enum): def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) +BULK_ACTION_COMPLETED_STATUSES = [ + BulkActionStatus.COMPLETED, + BulkActionStatus.FAILED, + BulkActionStatus.CANCELED, +] @record class BulkActionsFilter: diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index 2f767ad0d2987..02027e0e910be 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -33,8 +33,12 @@ PARENT_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}parent_run_id" +PARENT_BACKFILL_ID_TAG = f"{SYSTEM_TAG_PREFIX}parent_backfill_id" + ROOT_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}root_run_id" +ROOT_BACKFILL_ID_TAG = f"{SYSTEM_TAG_PREFIX}root_backfill_id" + RESUME_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}is_resume_retry" STEP_SELECTION_TAG = f"{SYSTEM_TAG_PREFIX}step_selection" From aae475dcee3caf59622f869547b255a5d691c82e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 15:01:44 -0400 Subject: [PATCH 03/11] testing --- .../implementation/execution/backfill.py | 7 +- .../graphql/test_partition_backfill.py | 202 +++++++++++++++++- 2 files changed, 201 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 1b5d99ef7d4b7..ca5a22c70ba59 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -345,8 +345,9 @@ def retry_partition_backfill( ) if backfill.is_asset_backfill: + asset_backfill_data = backfill.get_asset_backfill_data() if ( - backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets + asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 ): raise DagsterInvariantViolationError( @@ -356,13 +357,13 @@ def retry_partition_backfill( assert_permission_for_asset_graph( graphene_info, asset_graph, - backfill.asset_backfill_data.failed_and_downstream_subset.asset_keys, + list(asset_backfill_data.failed_and_downstream_subset.asset_keys), Permissions.LAUNCH_PARTITION_BACKFILL, ) new_backfill = PartitionBackfill.from_asset_graph_subset( backfill_id=make_new_backfill_id(), - asset_graph_subset=backfill.asset_backfill_data.failed_and_downstream_subset, + asset_graph_subset=asset_backfill_data.failed_and_downstream_subset, dynamic_partitions_store=graphene_info.context.instance, tags={ **backfill.tags, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 4291aaf5804ae..e39a291795783 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -1678,19 +1678,26 @@ def test_retry_asset_backfill_twice(self, graphql_context): assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + _execute_asset_backfill_iteration_no_side_effects( + graphql_context, retried_backfill.backfill_id, asset_graph + ) # mark some partitions failed so we can retry again _mock_asset_backfill_runs( graphql_context, AssetKey("upstream_daily_partitioned_asset"), asset_graph, - backfill_id, + retried_backfill.backfill_id, DagsterRunStatus.FAILURE, "2023-01-09", ) - _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + _execute_asset_backfill_iteration_no_side_effects( + graphql_context, retried_backfill.backfill_id, asset_graph + ) + # refetch the backfill to get the updated statuses of all assets + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) graphql_context.instance.update_backfill( - retried_backfill.with_status(BulkActionStatus.FAILED) + retried_backfill.with_status(BulkActionStatus.COMPLETED) ) result = execute_dagster_graphql( @@ -1707,10 +1714,11 @@ def test_retry_asset_backfill_twice(self, graphql_context): seconrd_retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] second_retried_backfill = graphql_context.instance.get_backfill(seconrd_retry_backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) assert ( - second_retried_backfill.asset_backfill_data.failed_and_downstream_subset - == retried_backfill.asset_backfill_data.target_subset + retried_backfill.asset_backfill_data.failed_and_downstream_subset + == second_retried_backfill.asset_backfill_data.target_subset ) assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retry_backfill_id assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id @@ -1748,6 +1756,9 @@ def test_retry_job_backfill(self, graphql_context): assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + # re-execute from failure (without the failure file) result = execute_dagster_graphql( graphql_context, @@ -1776,3 +1787,184 @@ def test_retry_job_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 assert result.data["partitionBackfillOrError"]["fromFailure"] + + def test_retry_in_progress_job_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + try: + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" + assert "still in progress" in result.data["retryPartitionBackfill"]["message"] + + # mark backfill as complete and confirm that we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + def test_retry_job_backfill_twice(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + # trigger failure in the conditionally_fail solid + + output_file = os.path.join( + get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail" + ) + with open(output_file, "w", encoding="utf8"): + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # mark backfill as complete and confirm that we can retry it + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + try: + with open(output_file, "w", encoding="utf8"): + # re-execute from failure (still with the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + finally: + os.remove(output_file) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + graphql_context.instance.update_backfill( + retried_backfill.with_status(BulkActionStatus.COMPLETED) + ) + + # re-execute from failure (without the failure file) + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": retried_backfill.backfill_id}, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + second_retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + second_retried_backfill = graphql_context.instance.get_backfill(second_retried_backfill_id) + + assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retried_backfill_id + assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + def test_retry_successful_job_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_retry_successful_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] + repository_selector = infer_repository_selector(graphql_context) + partition_set_selector = { + "repositorySelector": repository_selector, + "partitionSetName": "chained_failure_job_partition_set", + } + + result = execute_dagster_graphql_and_finish_runs( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": partition_set_selector, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # mark backfill as complete + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + # Unlike asset backfills, we don't currently have a way to see if a job backfill has runs that + # failed without querying the DB. So we always allow retries + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id From 2ce20a0fdbfcf241b1e3a9c62ea6b5e030fedf06 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 16 Aug 2024 13:09:58 -0400 Subject: [PATCH 04/11] fix missing parameter --- .../dagster_graphql/implementation/execution/backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index ca5a22c70ba59..d5c0f3bce2301 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -345,7 +345,7 @@ def retry_partition_backfill( ) if backfill.is_asset_backfill: - asset_backfill_data = backfill.get_asset_backfill_data() + asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph) if ( asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 From a4fb0f7da6de9f6d7b70f5ab2dc7072f62c30238 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 21 Aug 2024 13:53:55 -0400 Subject: [PATCH 05/11] update retry subset --- .../implementation/execution/backfill.py | 21 ++- .../graphql/test_partition_backfill.py | 165 +++++++++++++++++- 2 files changed, 175 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index d5c0f3bce2301..0628007ae556d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -346,24 +346,29 @@ def retry_partition_backfill( if backfill.is_asset_backfill: asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph) - if ( - asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets - == 0 - ): + # determine the subset that should be retried by removing the successfully materialized subset from + # the target subset. This ensures that if the backfill was canceled or marked failed that all + # non-materialized asset partitions will be retried. asset_backfill_data.failed_and_downstream_asset + # only contains asset partitions who's materialization runs failed and their downsteam assets, not + # asset partitions that never got materialization runs. + not_materialized_assets = ( + asset_backfill_data.target_subset - asset_backfill_data.materialized_subset + ) + if not_materialized_assets.num_partitions_and_non_partitioned_assets == 0: raise DagsterInvariantViolationError( - "Cannot retry an asset backfill that has no failed assets." + "Cannot retry an asset backfill that has no missing materializations." ) asset_graph = graphene_info.context.asset_graph assert_permission_for_asset_graph( graphene_info, asset_graph, - list(asset_backfill_data.failed_and_downstream_subset.asset_keys), + list(not_materialized_assets.asset_keys), Permissions.LAUNCH_PARTITION_BACKFILL, ) new_backfill = PartitionBackfill.from_asset_graph_subset( backfill_id=make_new_backfill_id(), - asset_graph_subset=asset_backfill_data.failed_and_downstream_subset, + asset_graph_subset=not_materialized_assets, dynamic_partitions_store=graphene_info.context.instance, tags={ **backfill.tags, @@ -374,7 +379,7 @@ def retry_partition_backfill( title=f"Retry of {backfill.title}" if backfill.title else None, description=backfill.description, ) - else: + else: # job backfill partition_set_origin = check.not_none(backfill.partition_set_origin) location_name = partition_set_origin.selector.location_name assert_permission_for_location( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index e39a291795783..dec6948486179 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -576,6 +576,65 @@ def test_cancel_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "CANCELED" + def test_cancel_then_retry_backfill(self, graphql_context): + repository_selector = infer_repository_selector(graphql_context) + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": { + "repositorySelector": repository_selector, + "partitionSetName": "integers_partition_set", + }, + "partitionNames": ["2", "3"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + result = execute_dagster_graphql( + graphql_context, + CANCEL_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + assert result.data + assert result.data["cancelPartitionBackfill"]["__typename"] == "CancelBackfillSuccess" + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) + + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + + result = execute_dagster_graphql( + graphql_context, + PARTITION_PROGRESS_QUERY, + variables={"backfillId": retried_backfill_id}, + ) + assert not result.errors + assert result.data + assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" + assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" + assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 + assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 + assert result.data["partitionBackfillOrError"]["fromFailure"] + def test_cancel_asset_backfill(self, graphql_context): asset_key = AssetKey("hanging_partition_asset") partitions = ["a"] @@ -652,6 +711,107 @@ def test_cancel_asset_backfill(self, graphql_context): assert len(runs) == 1 assert runs[0].status == DagsterRunStatus.CANCELED + def test_cancel_then_retry_asset_backfill(self, graphql_context): + asset_key = AssetKey("hanging_partition_asset") + partitions = ["a"] + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": partitions, + "assetSelection": [asset_key.to_graphql_input()], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + # Update asset backfill data to contain requested partition, but does not execute side effects, + # since launching the run will cause test process will hang forever. + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + asset_graph = repository.asset_graph + _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) + + # Launch the run that runs forever + selector = infer_job_selector(graphql_context, "hanging_partition_asset_job") + with safe_tempfile_path() as path: + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PIPELINE_EXECUTION_MUTATION, + variables={ + "executionParams": { + "selector": selector, + "mode": "default", + "runConfigData": { + "resources": {"hanging_asset_resource": {"config": {"file": path}}} + }, + "executionMetadata": { + "tags": [ + {"key": "dagster/partition", "value": "a"}, + {"key": BACKFILL_ID_TAG, "value": backfill_id}, + ] + }, + } + }, + ) + + assert not result.errors + assert result.data + + # ensure the execution has happened + while not os.path.exists(path): + time.sleep(0.1) + + result = execute_dagster_graphql( + graphql_context, + CANCEL_BACKFILL_MUTATION, + variables={"backfillId": backfill_id}, + ) + assert result.data + assert result.data["cancelPartitionBackfill"]["__typename"] == "CancelBackfillSuccess" + + while ( + graphql_context.instance.get_backfill(backfill_id).status + != BulkActionStatus.CANCELED + ): + _execute_backfill_iteration_with_side_effects(graphql_context, backfill_id) + + runs = graphql_context.instance.get_runs( + RunsFilter(tags={BACKFILL_ID_TAG: backfill_id}) + ) + assert len(runs) == 1 + assert runs[0].status == DagsterRunStatus.CANCELED + + result = execute_dagster_graphql( + graphql_context, + RETRY_BACKFILL_MUTATION, + variables={ + "backfillId": backfill_id, + }, + ) + + assert not result.errors + assert result.data + assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess" + retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] + + first_backfill = graphql_context.instance.get_backfill(backfill_id) + retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id) + + # no runs were successful for the first backfill, so the retry target should be the same + # as the first backfill + assert ( + first_backfill.asset_backfill_data.target_subset + == retried_backfill.asset_backfill_data.target_subset + ) + assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + def test_resume_backfill(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( @@ -1455,7 +1615,6 @@ def test_retry_successful_asset_backfill(self, graphql_context): asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), AssetKey("upstream_daily_partitioned_asset"), - AssetKey("downstream_weekly_partitioned_asset"), ] partitions = ["2023-01-09"] result = execute_dagster_graphql( @@ -1505,12 +1664,12 @@ def test_retry_successful_asset_backfill(self, graphql_context): "backfillId": backfill_id, }, ) - + # TestLaunchDaemonBackfillFromFailure::test_retry_successful_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] assert not result.errors assert result.data assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" assert ( - "Cannot retry an asset backfill that has no failed assets" + "Cannot retry an asset backfill that has no missing materializations" in result.data["retryPartitionBackfill"]["message"] ) From 0e5796baeae6739c60386843ff0450bc5f6f98e3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 4 Oct 2024 11:20:56 -0400 Subject: [PATCH 06/11] update tags --- .../implementation/execution/backfill.py | 10 ++-- .../graphql/test_partition_backfill.py | 48 +++++++++---------- .../dagster/_core/execution/backfill.py | 2 + 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 0628007ae556d..48047d3198c9d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -333,7 +333,7 @@ def resume_partition_backfill( def retry_partition_backfill( graphene_info: "ResolveInfo", backfill_id: str ) -> "GrapheneRetryBackfillSuccess": - from ...schema.backfill import GrapheneRetryBackfillSuccess + from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess backfill = graphene_info.context.instance.get_backfill(backfill_id) if not backfill: @@ -373,7 +373,9 @@ def retry_partition_backfill( tags={ **backfill.tags, PARENT_BACKFILL_ID_TAG: backfill.backfill_id, - ROOT_BACKFILL_ID_TAG: backfill.tags.get(PARENT_BACKFILL_ID_TAG, backfill.backfill_id), + ROOT_BACKFILL_ID_TAG: backfill.tags.get( + PARENT_BACKFILL_ID_TAG, backfill.backfill_id + ), }, backfill_timestamp=get_current_timestamp(), title=f"Retry of {backfill.title}" if backfill.title else None, @@ -396,7 +398,9 @@ def retry_partition_backfill( tags={ **backfill.tags, PARENT_BACKFILL_ID_TAG: backfill.backfill_id, - ROOT_BACKFILL_ID_TAG: backfill.tags.get(PARENT_BACKFILL_ID_TAG, backfill.backfill_id), + ROOT_BACKFILL_ID_TAG: backfill.tags.get( + PARENT_BACKFILL_ID_TAG, backfill.backfill_id + ), }, backfill_timestamp=get_current_timestamp(), asset_selection=backfill.asset_selection, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index dec6948486179..18bb67d4fafd9 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -25,9 +25,9 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, - PARENT_RUN_ID_TAG, + PARENT_BACKFILL_ID_TAG, PARTITION_NAME_TAG, - ROOT_RUN_ID_TAG, + ROOT_BACKFILL_ID_TAG, ) from dagster._core.test_utils import create_run_for_test, environ from dagster._core.utils import make_new_backfill_id @@ -619,8 +619,8 @@ def test_cancel_then_retry_backfill(self, graphql_context): retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id result = execute_dagster_graphql( graphql_context, @@ -809,8 +809,8 @@ def test_cancel_then_retry_asset_backfill(self, graphql_context): first_backfill.asset_backfill_data.target_subset == retried_backfill.asset_backfill_data.target_subset ) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_resume_backfill(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -1603,8 +1603,8 @@ def test_retry_asset_backfill(self, graphql_context): first_backfill.asset_backfill_data.failed_and_downstream_subset == retried_backfill.asset_backfill_data.target_subset ) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_successful_asset_backfill(self, graphql_context): # TestLaunchDaemonBackfillFromFailure::test_retry_successful_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] @@ -1759,8 +1759,8 @@ def test_retry_asset_backfill_still_in_progress(self, graphql_context): first_backfill.asset_backfill_data.failed_and_downstream_subset == retried_backfill.asset_backfill_data.target_subset ) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_asset_backfill_twice(self, graphql_context): # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill_twice[sqlite_with_default_run_launcher_managed_grpc_env] @@ -1834,8 +1834,8 @@ def test_retry_asset_backfill_twice(self, graphql_context): first_backfill.asset_backfill_data.failed_and_downstream_subset == retried_backfill.asset_backfill_data.target_subset ) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id _execute_asset_backfill_iteration_no_side_effects( graphql_context, retried_backfill.backfill_id, asset_graph @@ -1879,8 +1879,8 @@ def test_retry_asset_backfill_twice(self, graphql_context): retried_backfill.asset_backfill_data.failed_and_downstream_subset == second_retried_backfill.asset_backfill_data.target_subset ) - assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retry_backfill_id - assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert second_retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == retry_backfill_id + assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_job_backfill(self, graphql_context): # TestLaunchDaemonBackfillFromFailure::test_retry_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] @@ -1931,8 +1931,8 @@ def test_retry_job_backfill(self, graphql_context): retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id result = execute_dagster_graphql( graphql_context, @@ -2008,8 +2008,8 @@ def test_retry_in_progress_job_backfill(self, graphql_context): retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_job_backfill_twice(self, graphql_context): # TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] @@ -2061,8 +2061,8 @@ def test_retry_job_backfill_twice(self, graphql_context): retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id graphql_context.instance.update_backfill( retried_backfill.with_status(BulkActionStatus.COMPLETED) @@ -2081,8 +2081,8 @@ def test_retry_job_backfill_twice(self, graphql_context): second_retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] second_retried_backfill = graphql_context.instance.get_backfill(second_retried_backfill_id) - assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retried_backfill_id - assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert second_retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == retried_backfill_id + assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_successful_job_backfill(self, graphql_context): # TestLaunchDaemonBackfillFromFailure::test_retry_successful_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] @@ -2125,5 +2125,5 @@ def test_retry_successful_job_backfill(self, graphql_context): retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"] retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id) - assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id - assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id + assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id + assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 98d9c119df625..7ad0a837ddb65 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -41,12 +41,14 @@ class BulkActionStatus(Enum): def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) + BULK_ACTION_COMPLETED_STATUSES = [ BulkActionStatus.COMPLETED, BulkActionStatus.FAILED, BulkActionStatus.CANCELED, ] + @record class BulkActionsFilter: """Filters to use when querying for bulk actions (i.e. backfills) from the BulkActionsTable. From 6d3be63a61b615bf05263392adadec65f740d89b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 7 Oct 2024 10:10:44 -0400 Subject: [PATCH 07/11] fixup --- .../dagster_graphql/implementation/execution/backfill.py | 8 ++------ .../dagster/dagster/_core/execution/backfill.py | 3 +++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 48047d3198c9d..ea59518a9ed66 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -373,9 +373,7 @@ def retry_partition_backfill( tags={ **backfill.tags, PARENT_BACKFILL_ID_TAG: backfill.backfill_id, - ROOT_BACKFILL_ID_TAG: backfill.tags.get( - PARENT_BACKFILL_ID_TAG, backfill.backfill_id - ), + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), }, backfill_timestamp=get_current_timestamp(), title=f"Retry of {backfill.title}" if backfill.title else None, @@ -398,9 +396,7 @@ def retry_partition_backfill( tags={ **backfill.tags, PARENT_BACKFILL_ID_TAG: backfill.backfill_id, - ROOT_BACKFILL_ID_TAG: backfill.tags.get( - PARENT_BACKFILL_ID_TAG, backfill.backfill_id - ), + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), }, backfill_timestamp=get_current_timestamp(), asset_selection=backfill.asset_selection, diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 7ad0a837ddb65..07419c172c306 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -46,6 +46,9 @@ def from_graphql_input(graphql_str): BulkActionStatus.COMPLETED, BulkActionStatus.FAILED, BulkActionStatus.CANCELED, + BulkActionStatus.COMPLETED_SUCCESS, + BulkActionStatus.COMPLETED_SUCCESS, + BulkActionStatus.COMPLETED_FAILED, ] From 270e8b086e7527a0b990b972747eb9beb7700394 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 7 Oct 2024 10:57:38 -0400 Subject: [PATCH 08/11] gql gen --- .../packages/ui-core/src/graphql/possibleTypes.generated.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index c7cababb4ec26..04761da3d51f6 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunsFeedCountOrError":["RunsFeedCount","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file +{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunsFeedCountOrError":["RunsFeedCount","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"RetryBackfillResult":["RetryBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file From 96a36a0cfce1ef7cb5daf7cd43424b7ff0b1ed34 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 7 Oct 2024 13:07:07 -0400 Subject: [PATCH 09/11] cleanup --- .../dagster_graphql/implementation/execution/backfill.py | 4 ++-- .../graphql/test_partition_backfill.py | 9 --------- .../dagster/dagster/_core/execution/backfill.py | 2 +- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index ea59518a9ed66..7b93adad24b7d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -11,7 +11,7 @@ from dagster._core.events import AssetKey from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions from dagster._core.execution.backfill import ( - BULK_ACTION_COMPLETED_STATUSES, + BULK_ACTION_TERMINAL_STATUSES, BulkActionStatus, PartitionBackfill, ) @@ -339,7 +339,7 @@ def retry_partition_backfill( if not backfill: check.failed(f"No backfill found for id: {backfill_id}") - if backfill.status not in BULK_ACTION_COMPLETED_STATUSES: + if backfill.status not in BULK_ACTION_TERMINAL_STATUSES: raise DagsterInvariantViolationError( f"Cannot retry backfill {backfill_id} because it is still in progress." ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 18bb67d4fafd9..18f441b1f488c 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -1532,7 +1532,6 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 def test_retry_asset_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") asset_graph = repository.asset_graph @@ -1607,7 +1606,6 @@ def test_retry_asset_backfill(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_successful_asset_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_successful_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") asset_graph = repository.asset_graph @@ -1664,7 +1662,6 @@ def test_retry_successful_asset_backfill(self, graphql_context): "backfillId": backfill_id, }, ) - # TestLaunchDaemonBackfillFromFailure::test_retry_successful_asset_backfill[sqlite_with_default_run_launcher_managed_grpc_env] assert not result.errors assert result.data assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError" @@ -1674,7 +1671,6 @@ def test_retry_successful_asset_backfill(self, graphql_context): ) def test_retry_asset_backfill_still_in_progress(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill_still_in_progress[sqlite_with_default_run_launcher_managed_grpc_env] code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") asset_graph = repository.asset_graph @@ -1763,7 +1759,6 @@ def test_retry_asset_backfill_still_in_progress(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_asset_backfill_twice(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_asset_backfill_twice[sqlite_with_default_run_launcher_managed_grpc_env] code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") asset_graph = repository.asset_graph @@ -1883,7 +1878,6 @@ def test_retry_asset_backfill_twice(self, graphql_context): assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_job_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) partition_set_selector = { "repositorySelector": repository_selector, @@ -1948,7 +1942,6 @@ def test_retry_job_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["fromFailure"] def test_retry_in_progress_job_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) partition_set_selector = { "repositorySelector": repository_selector, @@ -2012,7 +2005,6 @@ def test_retry_in_progress_job_backfill(self, graphql_context): assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_job_backfill_twice(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) partition_set_selector = { "repositorySelector": repository_selector, @@ -2085,7 +2077,6 @@ def test_retry_job_backfill_twice(self, graphql_context): assert second_retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id def test_retry_successful_job_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_retry_successful_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) partition_set_selector = { "repositorySelector": repository_selector, diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 07419c172c306..2eac210ece544 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -42,7 +42,7 @@ def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) -BULK_ACTION_COMPLETED_STATUSES = [ +BULK_ACTION_TERMINAL_STATUSES = [ BulkActionStatus.COMPLETED, BulkActionStatus.FAILED, BulkActionStatus.CANCELED, From 901d512f6097245cc677fd092dfbed6fb8ab5475 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 8 Oct 2024 16:01:03 -0400 Subject: [PATCH 10/11] update --- .../src/graphql/possibleTypes.generated.json | 2 +- .../ui-core/src/graphql/schema.graphql | 8 +----- .../packages/ui-core/src/graphql/types.ts | 27 +++---------------- .../dagster_graphql/schema/backfill.py | 13 --------- .../dagster_graphql/schema/roots/mutation.py | 3 +-- 5 files changed, 7 insertions(+), 46 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index 04761da3d51f6..c7cababb4ec26 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunsFeedCountOrError":["RunsFeedCount","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"RetryBackfillResult":["RetryBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file +{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunsFeedCountOrError":["RunsFeedCount","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 681e149b7f89c..72918fc3d9d11 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3729,7 +3729,7 @@ type Mutation { ): ReportRunlessAssetEventsResult! launchPartitionBackfill(backfillParams: LaunchBackfillParams!): LaunchBackfillResult! resumePartitionBackfill(backfillId: String!): ResumeBackfillResult! - retryPartitionBackfill(backfillId: String!): RetryBackfillResult! + retryPartitionBackfill(backfillId: String!): LaunchBackfillResult! cancelPartitionBackfill(backfillId: String!): CancelBackfillResult! logTelemetry( action: String! @@ -3811,12 +3811,6 @@ type ResumeBackfillSuccess { backfillId: String! } -union RetryBackfillResult = RetryBackfillSuccess | UnauthorizedError | PythonError - -type RetryBackfillSuccess { - backfillId: String! -} - union CancelBackfillResult = CancelBackfillSuccess | UnauthorizedError | PythonError type CancelBackfillSuccess { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 26045012695f7..ce925d8e87e3f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2627,7 +2627,7 @@ export type Mutation = { resetSchedule: ScheduleMutationResult; resetSensor: SensorOrError; resumePartitionBackfill: ResumeBackfillResult; - retryPartitionBackfill: RetryBackfillResult; + retryPartitionBackfill: LaunchBackfillResult; scheduleDryRun: ScheduleDryRunResult; sensorDryRun: SensorDryRunResult; setAutoMaterializePaused: Scalars['Boolean']['output']; @@ -4423,13 +4423,6 @@ export type ResumeBackfillSuccess = { backfillId: Scalars['String']['output']; }; -export type RetryBackfillResult = PythonError | RetryBackfillSuccess | UnauthorizedError; - -export type RetryBackfillSuccess = { - __typename: 'RetryBackfillSuccess'; - backfillId: Scalars['String']['output']; -}; - export type Run = PipelineRun & RunsFeedEntry & { __typename: 'Run'; @@ -10264,9 +10257,9 @@ export const buildMutation = ( retryPartitionBackfill: overrides && overrides.hasOwnProperty('retryPartitionBackfill') ? overrides.retryPartitionBackfill! - : relationshipsToOmit.has('PythonError') - ? ({} as PythonError) - : buildPythonError({}, relationshipsToOmit), + : relationshipsToOmit.has('ConflictingExecutionParamsError') + ? ({} as ConflictingExecutionParamsError) + : buildConflictingExecutionParamsError({}, relationshipsToOmit), scheduleDryRun: overrides && overrides.hasOwnProperty('scheduleDryRun') ? overrides.scheduleDryRun! @@ -12945,18 +12938,6 @@ export const buildResumeBackfillSuccess = ( }; }; -export const buildRetryBackfillSuccess = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'RetryBackfillSuccess'} & RetryBackfillSuccess => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('RetryBackfillSuccess'); - return { - __typename: 'RetryBackfillSuccess', - backfillId: overrides && overrides.hasOwnProperty('backfillId') ? overrides.backfillId! : 'at', - }; -}; - export const buildRun = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index e3202962e1e45..bfc2101e583f2 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -107,25 +107,12 @@ class Meta: name = "ResumeBackfillSuccess" -class GrapheneRetryBackfillSuccess(graphene.ObjectType): - backfill_id = graphene.NonNull(graphene.String) - - class Meta: - name = "RetryBackfillSuccess" - - class GrapheneResumeBackfillResult(graphene.Union): class Meta: types = (GrapheneResumeBackfillSuccess, GrapheneUnauthorizedError, GraphenePythonError) name = "ResumeBackfillResult" -class GrapheneRetryBackfillResult(graphene.Union): - class Meta: - types = (GrapheneRetryBackfillSuccess, GrapheneUnauthorizedError, GraphenePythonError) - name = "RetryBackfillResult" - - class GrapheneBulkActionStatus(graphene.Enum): REQUESTED = "REQUESTED" COMPLETED = "COMPLETED" diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index b0088980aecf1..404c2959cd890 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -50,7 +50,6 @@ GrapheneCancelBackfillResult, GrapheneLaunchBackfillResult, GrapheneResumeBackfillResult, - GrapheneRetryBackfillResult, ) from dagster_graphql.schema.errors import ( GrapheneAssetNotFoundError, @@ -371,7 +370,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): class GrapheneRetryBackfillMutation(graphene.Mutation): """Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions.""" - Output = graphene.NonNull(GrapheneRetryBackfillResult) + Output = graphene.NonNull(GrapheneLaunchBackfillResult) class Arguments: backfillId = graphene.NonNull(graphene.String) From d38a6129e476c9e2ccfa66eac103e38dfc869e5b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 8 Oct 2024 15:59:25 -0400 Subject: [PATCH 11/11] wip --- .../implementation/execution/__init__.py | 1 + .../implementation/execution/backfill.py | 61 +++++++++++++++++++ .../dagster_graphql/schema/roots/mutation.py | 18 ++++++ 3 files changed, 80 insertions(+) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index cd81a7308413c..81aa13d4e2363 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -42,6 +42,7 @@ from dagster_graphql.implementation.execution.backfill import ( cancel_partition_backfill as cancel_partition_backfill, create_and_launch_partition_backfill as create_and_launch_partition_backfill, + reexecute_partition_backfill as reexecute_partition_backfill, resume_partition_backfill as resume_partition_backfill, retry_partition_backfill as retry_partition_backfill, ) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 7b93adad24b7d..ee157f55a93ab 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -406,3 +406,64 @@ def retry_partition_backfill( graphene_info.context.instance.add_backfill(new_backfill) return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id) + + +def reexecute_partition_backfill( + graphene_info: "ResolveInfo", backfill_id: str +) -> "GrapheneRetryBackfillSuccess": + from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess + + backfill = graphene_info.context.instance.get_backfill(backfill_id) + if not backfill: + check.failed(f"No backfill found for id: {backfill_id}") + + if backfill.is_asset_backfill: + asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph) + asset_graph = graphene_info.context.asset_graph + assert_permission_for_asset_graph( + graphene_info, + asset_graph, + list(asset_backfill_data.target_subset.asset_keys), + Permissions.LAUNCH_PARTITION_BACKFILL, + ) + + new_backfill = PartitionBackfill.from_asset_graph_subset( + backfill_id=make_new_backfill_id(), + asset_graph_subset=asset_backfill_data.target_subset, + dynamic_partitions_store=graphene_info.context.instance, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + title=f"Re-execution of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + else: # job backfill + partition_set_origin = check.not_none(backfill.partition_set_origin) + location_name = partition_set_origin.selector.location_name + assert_permission_for_location( + graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name + ) + + new_backfill = PartitionBackfill( + backfill_id=make_new_backfill_id(), + partition_set_origin=backfill.partition_set_origin, + status=BulkActionStatus.REQUESTED, + partition_names=backfill.partition_names, + from_failure=False, + reexecution_steps=backfill.reexecution_steps, + tags={ + **backfill.tags, + PARENT_BACKFILL_ID_TAG: backfill.backfill_id, + ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id), + }, + backfill_timestamp=get_current_timestamp(), + asset_selection=backfill.asset_selection, + title=f"Re-execution of {backfill.title}" if backfill.title else None, + description=backfill.description, + ) + + graphene_info.context.instance.add_backfill(new_backfill) + return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 404c2959cd890..f04803a0c2bb8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -19,6 +19,7 @@ from dagster_graphql.implementation.execution.backfill import ( cancel_partition_backfill, create_and_launch_partition_backfill, + reexecute_partition_backfill, resume_partition_backfill, retry_partition_backfill, ) @@ -384,6 +385,23 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str): return retry_partition_backfill(graphene_info, backfillId) +class GrapheneReexecuteBackfillMutation(graphene.Mutation): + """Re-executes a backfill. Re-executing a backfill will create a new backfill that targets the same partitions as the existing backfill.""" + + Output = graphene.NonNull(GrapheneLaunchBackfillResult) + + class Arguments: + backfillId = graphene.NonNull(graphene.String) + + class Meta: + name = "RetryBackfillMutation" + + @capture_error + @require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL) + def mutate(self, graphene_info: ResolveInfo, backfillId: str): + return reexecute_partition_backfill(graphene_info, backfillId) + + class GrapheneAddDynamicPartitionMutation(graphene.Mutation): """Adds a partition to a dynamic partition set."""