Skip to content

Commit

Permalink
[4/n] [RFC] add launch multiple runs backend functionality (dagster-i…
Browse files Browse the repository at this point in the history
…o#25880)

Linear:
https://linear.app/dagster-labs/issue/FE-659/add-launch-all-backend-functionality

## Summary & Motivation
Add a backend GraphQL mutation to handle launching multiple runs

## How I Tested These Changes
`pytest
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_launcher.py`
50 passed, 18 warnings in 206.16s (0:03:26)
  • Loading branch information
dliu27 committed Dec 9, 2024
1 parent 47afbae commit 1afb9ea
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 11 deletions.

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/client/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,71 @@
)


LAUNCH_MULTIPLE_RUNS_MUTATION = (
ERROR_FRAGMENT
+ """
mutation($executionParamsList: [ExecutionParams!]!) {
launchMultipleRuns(executionParamsList: $executionParamsList) {
__typename
... on LaunchMultipleRunsResult {
launchMultipleRunsResult {
__typename
... on InvalidStepError {
invalidStepKey
}
... on InvalidOutputError {
stepKey
invalidOutputName
}
... on LaunchRunSuccess {
run {
runId
pipeline {
name
}
tags {
key
value
}
status
runConfigYaml
mode
resolvedOpSelection
}
}
... on ConflictingExecutionParamsError {
message
}
... on PresetNotFoundError {
preset
message
}
... on RunConfigValidationInvalid {
pipelineName
errors {
__typename
message
path
reason
}
}
... on PipelineNotFoundError {
message
pipelineName
}
... on PythonError {
...errorFragment
}
}
}
... on PythonError {
...errorFragment
}
}
}
"""
)

LAUNCH_PIPELINE_REEXECUTION_MUTATION = (
ERROR_FRAGMENT
+ """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def types():
GrapheneDeletePipelineRunSuccess,
GrapheneDeleteRunMutation,
GrapheneLaunchBackfillMutation,
GrapheneLaunchMultipleRunsMutation,
GrapheneLaunchRunMutation,
GrapheneLaunchRunReexecutionMutation,
GrapheneReloadRepositoryLocationMutation,
Expand Down Expand Up @@ -38,6 +39,7 @@ def types():
GrapheneExecutionPlanOrError,
GrapheneLaunchBackfillMutation,
GrapheneLaunchRunMutation,
GrapheneLaunchMultipleRunsMutation,
GrapheneLaunchRunReexecutionMutation,
GraphenePipelineOrError,
GrapheneReloadRepositoryLocationMutation,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Sequence, Union
from typing import List, Optional, Sequence, Union

import dagster._check as check
import graphene
Expand Down Expand Up @@ -78,6 +78,8 @@
)
from dagster_graphql.schema.pipelines.pipeline import GrapheneRun
from dagster_graphql.schema.runs import (
GrapheneLaunchMultipleRunsResult,
GrapheneLaunchMultipleRunsResultOrError,
GrapheneLaunchRunReexecutionResult,
GrapheneLaunchRunResult,
GrapheneLaunchRunSuccess,
Expand Down Expand Up @@ -316,6 +318,38 @@ def mutate(
return create_execution_params_and_launch_pipeline_exec(graphene_info, executionParams)


class GrapheneLaunchMultipleRunsMutation(graphene.Mutation):
"""Launches multiple job runs."""

Output = graphene.NonNull(GrapheneLaunchMultipleRunsResultOrError)

class Arguments:
executionParamsList = non_null_list(GrapheneExecutionParams)

class Meta:
name = "LaunchMultipleRunsMutation"

@capture_error
def mutate(
self, graphene_info: ResolveInfo, executionParamsList: List[GrapheneExecutionParams]
) -> Union[
GrapheneLaunchMultipleRunsResult,
GrapheneError,
GraphenePythonError,
]:
launch_multiple_runs_result = []

for execution_params in executionParamsList:
result = GrapheneLaunchRunMutation.mutate(
None, graphene_info, executionParams=execution_params
)
launch_multiple_runs_result.append(result)

return GrapheneLaunchMultipleRunsResult(
launchMultipleRunsResult=launch_multiple_runs_result
)


class GrapheneLaunchBackfillMutation(graphene.Mutation):
"""Launches a set of partition backfill runs."""

Expand Down Expand Up @@ -984,6 +1018,7 @@ class Meta:

launchPipelineExecution = GrapheneLaunchRunMutation.Field()
launchRun = GrapheneLaunchRunMutation.Field()
launchMultipleRuns = GrapheneLaunchMultipleRunsMutation.Field()
launchPipelineReexecution = GrapheneLaunchRunReexecutionMutation.Field()
launchRunReexecution = GrapheneLaunchRunReexecutionMutation.Field()
startSchedule = GrapheneStartScheduleMutation.Field()
Expand Down
21 changes: 17 additions & 4 deletions python_modules/dagster-graphql/dagster_graphql/schema/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from dagster_graphql.implementation.fetch_runs import get_run_ids, get_runs, get_runs_count
from dagster_graphql.implementation.utils import UserFacingGraphQLError
from dagster_graphql.schema.backfill import pipeline_execution_error_types
from dagster_graphql.schema.errors import (
GrapheneInvalidPipelineRunsFilterError,
GraphenePythonError,
Expand Down Expand Up @@ -73,17 +74,28 @@ class Meta:

class GrapheneLaunchRunResult(graphene.Union):
class Meta:
from dagster_graphql.schema.backfill import pipeline_execution_error_types

types = launch_pipeline_run_result_types + pipeline_execution_error_types

name = "LaunchRunResult"


class GrapheneLaunchRunReexecutionResult(graphene.Union):
class GrapheneLaunchMultipleRunsResult(graphene.ObjectType):
"""Contains results from multiple pipeline launches."""

launchMultipleRunsResult = non_null_list(GrapheneLaunchRunResult)

class Meta:
name = "LaunchMultipleRunsResult"


class GrapheneLaunchMultipleRunsResultOrError(graphene.Union):
class Meta:
from dagster_graphql.schema.backfill import pipeline_execution_error_types
types = (GrapheneLaunchMultipleRunsResult, GraphenePythonError)
name = "LaunchMultipleRunsResultOrError"


class GrapheneLaunchRunReexecutionResult(graphene.Union):
class Meta:
types = launch_pipeline_run_result_types + pipeline_execution_error_types

name = "LaunchRunReexecutionResult"
Expand Down Expand Up @@ -213,6 +225,7 @@ def parse_run_config_input(

types = [
GrapheneLaunchRunResult,
GrapheneLaunchMultipleRunsResult,
GrapheneLaunchRunReexecutionResult,
GrapheneLaunchPipelineRunSuccess,
GrapheneLaunchRunSuccess,
Expand Down
Loading

0 comments on commit 1afb9ea

Please sign in to comment.