From c0942201093cf6e1fc71b7d0f255359cc47a2238 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 6 Aug 2024 15:42:14 -0700 Subject: [PATCH] graphql branch-name: partition-set-to-job/graphql --- .../ui-core/src/graphql/schema.graphql | 29 +- .../packages/ui-core/src/graphql/types.ts | 85 ++++++ .../implementation/fetch_partition_sets.py | 28 +- .../dagster_graphql/implementation/utils.py | 24 ++ .../auto_materialize_asset_evaluations.py | 22 +- .../dagster_graphql/schema/inputs.py | 4 + .../dagster_graphql/schema/partition_keys.py | 26 ++ .../dagster_graphql/schema/partition_sets.py | 47 +++- .../schema/pipelines/pipeline.py | 87 +++++- .../graphql/test_job_partitions.py | 263 ++++++++++++++++++ 10 files changed, 560 insertions(+), 55 deletions(-) create mode 100644 python_modules/dagster-graphql/dagster_graphql/schema/partition_keys.py create mode 100644 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_job_partitions.py diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index aa7daa1095141..ef37cb99115bb 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1076,6 +1076,24 @@ type Pipeline implements SolidContainer & IPipelineSnapshot { isJob: Boolean! isAssetJob: Boolean! repository: Repository! + partitionKeysOrError( + cursor: String + limit: Int + reverse: Boolean + selectedAssetKeys: [AssetKeyInput!] + ): PartitionKeys! + partition(partitionName: String!, selectedAssetKeys: [AssetKeyInput!]): PartitionTagsAndConfig +} + +type PartitionKeys { + partitionKeys: [String!]! +} + +type PartitionTagsAndConfig { + name: String! + jobName: String! + runConfigOrError: PartitionRunConfigOrError! + tagsOrError: PartitionTagsOrError! } interface PipelineConfigValidationError { @@ -2125,6 +2143,13 @@ type Job implements SolidContainer & IPipelineSnapshot { isJob: Boolean! isAssetJob: Boolean! repository: Repository! + partitionKeysOrError( + cursor: String + limit: Int + reverse: Boolean + selectedAssetKeys: [AssetKeyInput!] + ): PartitionKeys! + partition(partitionName: String!, selectedAssetKeys: [AssetKeyInput!]): PartitionTagsAndConfig } enum SensorType { @@ -3508,10 +3533,6 @@ type AutoMaterializeRuleEvaluation { union PartitionKeysOrError = PartitionKeys | PartitionSubsetDeserializationError -type PartitionKeys { - partitionKeys: [String!]! -} - type PartitionSubsetDeserializationError implements Error { message: String! } diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 21e30296eeef2..2c2fe5f5248d8 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2074,6 +2074,8 @@ export type Job = IPipelineSnapshot & modes: Array; name: Scalars['String']['output']; parentSnapshotId: Maybe; + partition: Maybe; + partitionKeysOrError: PartitionKeys; pipelineSnapshotId: Scalars['String']['output']; presets: Array; repository: Repository; @@ -2090,6 +2092,18 @@ export type JobDagsterTypeOrErrorArgs = { dagsterTypeName: Scalars['String']['input']; }; +export type JobPartitionArgs = { + partitionName: Scalars['String']['input']; + selectedAssetKeys?: InputMaybe>; +}; + +export type JobPartitionKeysOrErrorArgs = { + cursor?: InputMaybe; + limit?: InputMaybe; + reverse?: InputMaybe; + selectedAssetKeys?: InputMaybe>; +}; + export type JobRunsArgs = { cursor?: InputMaybe; limit?: InputMaybe; @@ -3262,6 +3276,14 @@ export type PartitionTags = { results: Array; }; +export type PartitionTagsAndConfig = { + __typename: 'PartitionTagsAndConfig'; + jobName: Scalars['String']['output']; + name: Scalars['String']['output']; + runConfigOrError: PartitionRunConfigOrError; + tagsOrError: PartitionTagsOrError; +}; + export type PartitionTagsOrError = PartitionTags | PythonError; export type PartitionedAssetConditionEvaluationNode = { @@ -3348,6 +3370,8 @@ export type Pipeline = IPipelineSnapshot & modes: Array; name: Scalars['String']['output']; parentSnapshotId: Maybe; + partition: Maybe; + partitionKeysOrError: PartitionKeys; pipelineSnapshotId: Scalars['String']['output']; presets: Array; repository: Repository; @@ -3364,6 +3388,18 @@ export type PipelineDagsterTypeOrErrorArgs = { dagsterTypeName: Scalars['String']['input']; }; +export type PipelinePartitionArgs = { + partitionName: Scalars['String']['input']; + selectedAssetKeys?: InputMaybe>; +}; + +export type PipelinePartitionKeysOrErrorArgs = { + cursor?: InputMaybe; + limit?: InputMaybe; + reverse?: InputMaybe; + selectedAssetKeys?: InputMaybe>; +}; + export type PipelineRunsArgs = { cursor?: InputMaybe; limit?: InputMaybe; @@ -9054,6 +9090,18 @@ export const buildJob = ( overrides && overrides.hasOwnProperty('parentSnapshotId') ? overrides.parentSnapshotId! : 'tempore', + partition: + overrides && overrides.hasOwnProperty('partition') + ? overrides.partition! + : relationshipsToOmit.has('PartitionTagsAndConfig') + ? ({} as PartitionTagsAndConfig) + : buildPartitionTagsAndConfig({}, relationshipsToOmit), + partitionKeysOrError: + overrides && overrides.hasOwnProperty('partitionKeysOrError') + ? overrides.partitionKeysOrError! + : relationshipsToOmit.has('PartitionKeys') + ? ({} as PartitionKeys) + : buildPartitionKeys({}, relationshipsToOmit), pipelineSnapshotId: overrides && overrides.hasOwnProperty('pipelineSnapshotId') ? overrides.pipelineSnapshotId! @@ -11040,6 +11088,31 @@ export const buildPartitionTags = ( }; }; +export const buildPartitionTagsAndConfig = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'PartitionTagsAndConfig'} & PartitionTagsAndConfig => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PartitionTagsAndConfig'); + return { + __typename: 'PartitionTagsAndConfig', + jobName: overrides && overrides.hasOwnProperty('jobName') ? overrides.jobName! : 'quia', + name: overrides && overrides.hasOwnProperty('name') ? overrides.name! : 'eaque', + runConfigOrError: + overrides && overrides.hasOwnProperty('runConfigOrError') + ? overrides.runConfigOrError! + : relationshipsToOmit.has('PartitionRunConfig') + ? ({} as PartitionRunConfig) + : buildPartitionRunConfig({}, relationshipsToOmit), + tagsOrError: + overrides && overrides.hasOwnProperty('tagsOrError') + ? overrides.tagsOrError! + : relationshipsToOmit.has('PartitionTags') + ? ({} as PartitionTags) + : buildPartitionTags({}, relationshipsToOmit), + }; +}; + export const buildPartitionedAssetConditionEvaluationNode = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -11201,6 +11274,18 @@ export const buildPipeline = ( overrides && overrides.hasOwnProperty('parentSnapshotId') ? overrides.parentSnapshotId! : 'et', + partition: + overrides && overrides.hasOwnProperty('partition') + ? overrides.partition! + : relationshipsToOmit.has('PartitionTagsAndConfig') + ? ({} as PartitionTagsAndConfig) + : buildPartitionTagsAndConfig({}, relationshipsToOmit), + partitionKeysOrError: + overrides && overrides.hasOwnProperty('partitionKeysOrError') + ? overrides.partitionKeysOrError! + : relationshipsToOmit.has('PartitionKeys') + ? ({} as PartitionKeys) + : buildPartitionKeys({}, relationshipsToOmit), pipelineSnapshotId: overrides && overrides.hasOwnProperty('pipelineSnapshotId') ? overrides.pipelineSnapshotId! diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py index 10f36245eaae7..b839b47003c87 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py @@ -19,6 +19,8 @@ from dagster_graphql.schema.util import ResolveInfo +from .utils import apply_cursor_limit_reverse + if TYPE_CHECKING: from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError from dagster_graphql.schema.partition_sets import ( @@ -175,7 +177,7 @@ def get_partitions( check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.inst_param(partition_set, "partition_set", ExternalPartitionSet) - partition_names = _apply_cursor_limit_reverse(partition_names, cursor, limit, reverse) + partition_names = apply_cursor_limit_reverse(partition_names, cursor, limit, reverse) return GraphenePartitions( results=[ @@ -189,30 +191,6 @@ def get_partitions( ) -def _apply_cursor_limit_reverse( - items: Sequence[str], cursor: Optional[str], limit: Optional[int], reverse: Optional[bool] -) -> Sequence[str]: - start = 0 - end = len(items) - index = 0 - - if cursor: - index = next((idx for (idx, item) in enumerate(items) if item == cursor)) - - if reverse: - end = index - else: - start = index + 1 - - if limit: - if reverse: - start = end - limit - else: - end = start + limit - - return items[max(start, 0) : end] - - def get_partition_set_partition_statuses( graphene_info: ResolveInfo, external_partition_set: ExternalPartitionSet, diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py index c1add00c45c48..87b8d82da26fa 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py @@ -290,5 +290,29 @@ def to_graphql_input(self) -> Mapping[str, Any]: } +def apply_cursor_limit_reverse( + items: Sequence[str], cursor: Optional[str], limit: Optional[int], reverse: Optional[bool] +) -> Sequence[str]: + start = 0 + end = len(items) + index = 0 + + if cursor: + index = next((idx for (idx, item) in enumerate(items) if item == cursor)) + + if reverse: + end = index + else: + start = index + 1 + + if limit: + if reverse: + start = end - limit + else: + end = start + limit + + return items[max(start, 0) : end] + + BackfillParams: TypeAlias = Mapping[str, Any] AssetBackfillPreviewParams: TypeAlias = Mapping[str, Any] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/auto_materialize_asset_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/schema/auto_materialize_asset_evaluations.py index a5ee919355d54..0d62f88bf9df8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/auto_materialize_asset_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/auto_materialize_asset_evaluations.py @@ -15,32 +15,12 @@ from .asset_key import GrapheneAssetKey from .auto_materialize_policy import GrapheneAutoMaterializeRule +from .partition_keys import GraphenePartitionKeys, GraphenePartitionKeysOrError from .util import non_null_list GrapheneAutoMaterializeDecisionType = graphene.Enum.from_enum(AutoMaterializeDecisionType) -class GraphenePartitionKeys(graphene.ObjectType): - partitionKeys = non_null_list(graphene.String) - - class Meta: - name = "PartitionKeys" - - -class GraphenePartitionSubsetDeserializationError(graphene.ObjectType): - message = graphene.NonNull(graphene.String) - - class Meta: - interfaces = (GrapheneError,) - name = "PartitionSubsetDeserializationError" - - -class GraphenePartitionKeysOrError(graphene.Union): - class Meta: - types = (GraphenePartitionKeys, GraphenePartitionSubsetDeserializationError) - name = "PartitionKeysOrError" - - class GrapheneTextRuleEvaluationData(graphene.ObjectType): text = graphene.String() diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index c7fdf5dbcbfa1..f0e01d770e230 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -1,4 +1,5 @@ import graphene +from dagster._core.definitions.asset_key import AssetKey from dagster._core.events import DagsterEventType from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter from dagster._time import datetime_from_timestamp @@ -15,6 +16,9 @@ class GrapheneAssetKeyInput(graphene.InputObjectType): class Meta: name = "AssetKeyInput" + def to_asset_key(self) -> AssetKey: + return AssetKey(self.path) + class GrapheneAssetCheckHandleInput(graphene.InputObjectType): assetKey = graphene.NonNull(GrapheneAssetKeyInput) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_keys.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_keys.py new file mode 100644 index 0000000000000..653116f68598f --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_keys.py @@ -0,0 +1,26 @@ +import graphene + +from dagster_graphql.schema.errors import GrapheneError + +from .util import non_null_list + + +class GraphenePartitionKeys(graphene.ObjectType): + partitionKeys = non_null_list(graphene.String) + + class Meta: + name = "PartitionKeys" + + +class GraphenePartitionSubsetDeserializationError(graphene.ObjectType): + message = graphene.NonNull(graphene.String) + + class Meta: + interfaces = (GrapheneError,) + name = "PartitionSubsetDeserializationError" + + +class GraphenePartitionKeysOrError(graphene.Union): + class Meta: + types = (GraphenePartitionKeys, GraphenePartitionSubsetDeserializationError) + name = "PartitionKeysOrError" diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py index cc00bbd286750..181e4998f49bb 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py @@ -1,10 +1,11 @@ -from typing import Optional, Sequence, cast +from typing import AbstractSet, Optional, Sequence, cast import dagster._check as check import graphene from dagster import MultiPartitionsDefinition +from dagster._core.definitions.asset_key import AssetKey from dagster._core.errors import DagsterUserCodeProcessError -from dagster._core.remote_representation import ExternalPartitionSet, RepositoryHandle +from dagster._core.remote_representation import ExternalJob, ExternalPartitionSet, RepositoryHandle from dagster._core.remote_representation.external_data import ( ExternalDynamicPartitionsDefinitionData, ExternalMultiPartitionsDefinitionData, @@ -184,6 +185,48 @@ class Meta: name = "PartitionTagsOrError" +class GrapheneJobSelectionPartition(graphene.ObjectType): + name = graphene.NonNull(graphene.String) + job_name = graphene.NonNull(graphene.String) + runConfigOrError = graphene.NonNull(GraphenePartitionRunConfigOrError) + tagsOrError = graphene.NonNull(GraphenePartitionTagsOrError) + + class Meta: + name = "PartitionTagsAndConfig" + + def __init__( + self, + external_job: ExternalJob, + partition_name: str, + selected_asset_keys: Optional[AbstractSet[AssetKey]], + ): + self._external_job = external_job + self._partition_name = partition_name + self._selected_asset_keys = selected_asset_keys + + super().__init__(name=partition_name, job_name=external_job.name) + + @capture_error + def resolve_runConfigOrError(self, graphene_info: ResolveInfo) -> GraphenePartitionRunConfig: + return get_partition_config( + graphene_info, + self._external_job.repository_handle, + self._external_job.name, + self._partition_name, + selected_asset_keys=self._selected_asset_keys, + ) + + @capture_error + def resolve_tagsOrError(self, graphene_info: ResolveInfo) -> GraphenePartitionTags: + return get_partition_tags( + graphene_info, + self._external_job.repository_handle, + self._external_job.name, + self._partition_name, + selected_asset_keys=self._selected_asset_keys, + ) + + class GraphenePartition(graphene.ObjectType): name = graphene.NonNull(graphene.String) partition_set_name = graphene.NonNull(graphene.String) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index 6293be26a19ac..cee871f2d34c6 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -1,11 +1,17 @@ -from typing import List, Optional, Sequence +from typing import TYPE_CHECKING, AbstractSet, List, Optional, Sequence import dagster._check as check import graphene +from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.time_window_partitions import PartitionRangeStatus +from dagster._core.errors import DagsterUserCodeProcessError from dagster._core.events import DagsterEventType from dagster._core.remote_representation.external import ExternalExecutionPlan, ExternalJob -from dagster._core.remote_representation.external_data import DEFAULT_MODE_NAME, ExternalPresetData +from dagster._core.remote_representation.external_data import ( + DEFAULT_MODE_NAME, + ExternalPartitionExecutionErrorData, + ExternalPresetData, +) from dagster._core.remote_representation.represented import RepresentedJob from dagster._core.storage.dagster_run import ( DagsterRunStatsSnapshot, @@ -26,7 +32,11 @@ from ...implementation.fetch_runs import get_runs, get_stats, get_step_stats from ...implementation.fetch_schedules import get_schedules_for_pipeline from ...implementation.fetch_sensors import get_sensors_for_pipeline -from ...implementation.utils import UserFacingGraphQLError, capture_error +from ...implementation.utils import ( + UserFacingGraphQLError, + apply_cursor_limit_reverse, + capture_error, +) from ..asset_checks import GrapheneAssetCheckHandle from ..asset_key import GrapheneAssetKey from ..dagster_types import ( @@ -37,6 +47,7 @@ ) from ..errors import GrapheneDagsterTypeNotFoundError, GraphenePythonError, GrapheneRunNotFoundError from ..execution import GrapheneExecutionPlan +from ..inputs import GrapheneAssetKeyInput from ..logs.compute_logs import GrapheneCapturedLogs, from_captured_log_data from ..logs.events import ( GrapheneDagsterRunEvent, @@ -44,6 +55,7 @@ GrapheneObservationEvent, GrapheneRunStepStats, ) +from ..partition_keys import GraphenePartitionKeys from ..repository_origin import GrapheneRepositoryOrigin from ..runs import GrapheneRunConfigData from ..runs_feed import GrapheneRunsFeedEntry @@ -63,6 +75,10 @@ from .pipeline_run_stats import GrapheneRunStatsSnapshotOrError from .status import GrapheneRunStatus +if TYPE_CHECKING: + from ..partition_sets import GrapheneJobSelectionPartition + + STARTED_STATUSES = { DagsterRunStatus.STARTED, DagsterRunStatus.SUCCESS, @@ -871,6 +887,22 @@ class GraphenePipeline(GrapheneIPipelineSnapshotMixin, graphene.ObjectType): isJob = graphene.NonNull(graphene.Boolean) isAssetJob = graphene.NonNull(graphene.Boolean) repository = graphene.NonNull("dagster_graphql.schema.external.GrapheneRepository") + partitionKeysOrError = graphene.Field( + graphene.NonNull(GraphenePartitionKeys), + cursor=graphene.String(), + limit=graphene.Int(), + reverse=graphene.Boolean(), + selected_asset_keys=graphene.Argument( + graphene.List(graphene.NonNull(GrapheneAssetKeyInput)) + ), + ) + partition = graphene.Field( + "dagster_graphql.schema.partition_sets.GrapheneJobSelectionPartition", + partition_name=graphene.NonNull(graphene.String), + selected_asset_keys=graphene.Argument( + graphene.List(graphene.NonNull(GrapheneAssetKeyInput)) + ), + ) class Meta: interfaces = (GrapheneSolidContainer, GrapheneIPipelineSnapshot) @@ -912,6 +944,47 @@ def resolve_repository(self, graphene_info: ResolveInfo): location, ) + @capture_error + def resolve_partitionKeysOrError( + self, + graphene_info: ResolveInfo, + cursor: Optional[str] = None, + limit: Optional[int] = None, + reverse: Optional[bool] = None, + selected_asset_keys: Optional[List[GrapheneAssetKeyInput]] = None, + ) -> GraphenePartitionKeys: + result = graphene_info.context.get_external_partition_names( + repository_handle=self._external_job.repository_handle, + job_name=self._external_job.name, + selected_asset_keys=_asset_key_input_list_to_asset_key_set(selected_asset_keys), + instance=graphene_info.context.instance, + ) + + if isinstance(result, ExternalPartitionExecutionErrorData): + raise DagsterUserCodeProcessError.from_error_info(result.error) + + all_partition_keys = result.partition_names + + return GraphenePartitionKeys( + partitionKeys=apply_cursor_limit_reverse( + all_partition_keys, cursor=cursor, limit=limit, reverse=reverse or False + ) + ) + + def resolve_partition( + self, + graphene_info: ResolveInfo, + partition_name: str, + selected_asset_keys: Optional[List[GrapheneAssetKeyInput]] = None, + ) -> "GrapheneJobSelectionPartition": + from ..partition_sets import GrapheneJobSelectionPartition + + return GrapheneJobSelectionPartition( + external_job=self._external_job, + partition_name=partition_name, + selected_asset_keys=_asset_key_input_list_to_asset_key_set(selected_asset_keys), + ) + class GrapheneJob(GraphenePipeline): class Meta: @@ -992,3 +1065,11 @@ class GrapheneRunOrError(graphene.Union): class Meta: types = (GrapheneRun, GrapheneRunNotFoundError, GraphenePythonError) name = "RunOrError" + + +def _asset_key_input_list_to_asset_key_set( + asset_keys: Optional[List[GrapheneAssetKeyInput]], +) -> Optional[AbstractSet[AssetKey]]: + return ( + {key_input.to_asset_key() for key_input in asset_keys} if asset_keys is not None else None + ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_job_partitions.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_job_partitions.py new file mode 100644 index 0000000000000..80c723f2baf46 --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_job_partitions.py @@ -0,0 +1,263 @@ +from dagster import ( + AssetKey, + ConfigurableResource, + Definitions, + StaticPartitionsDefinition, + asset, + job, + op, + static_partitioned_config, +) +from dagster._core.definitions.repository_definition import SINGLETON_REPOSITORY_NAME +from dagster._core.test_utils import ensure_dagster_tests_import, instance_for_test +from dagster_graphql.test.utils import define_out_of_process_context, execute_dagster_graphql + +ensure_dagster_tests_import() + + +GET_PARTITIONS_QUERY = """ + query SingleJobQuery($selector: PipelineSelector!, $selectedAssetKeys: [AssetKeyInput!]) { + pipelineOrError(params: $selector) { + ... on Pipeline { + name + partitionKeysOrError(selectedAssetKeys: $selectedAssetKeys) { + partitionKeys + } + } + } + } +""" + +GET_PARTITION_TAGS_QUERY = """ + query SingleJobQuery($selector: PipelineSelector!, $partitionName: String!, $selectedAssetKeys: [AssetKeyInput!]) { + pipelineOrError(params: $selector) { + ... on Pipeline { + name + partition(partitionName: $partitionName, selectedAssetKeys: $selectedAssetKeys) { + name + tagsOrError { + ... on PartitionTags { + results { + key + value + } + } + } + } + } + } + } +""" + +GET_PARTITION_RUN_CONFIG_QUERY = """ + query SingleJobQuery($selector: PipelineSelector!, $partitionName: String!, $selectedAssetKeys: [AssetKeyInput!]) { + pipelineOrError(params: $selector) { + ... on Pipeline { + name + partition(partitionName: $partitionName, selectedAssetKeys: $selectedAssetKeys) { + name + runConfigOrError { + ... on PartitionRunConfig { + yaml + } + } + } + } + } + } +""" + + +def get_repo_with_partitioned_op_job(): + @op + def op1(): ... + + @static_partitioned_config(["1", "2"]) + def my_partitioned_config(partition): + return {"ops": {"op1": {"config": {"p": partition}}}} + + @job(config=my_partitioned_config) + def job1(): + op1() + + return Definitions(jobs=[job1]).get_repository_def() + + +def get_repo_with_differently_partitioned_assets(): + @asset(partitions_def=StaticPartitionsDefinition(["1", "2"])) + def asset1(): ... + + ab_partitions_def = StaticPartitionsDefinition(["a", "b"]) + + @asset(partitions_def=ab_partitions_def) + def asset2(): ... + + class MyResource(ConfigurableResource): + foo: str + + @asset(partitions_def=ab_partitions_def) + def asset3(resource1: MyResource): ... + + return Definitions( + assets=[asset1, asset2, asset3], resources={"resource1": MyResource(foo="bar")} + ).get_repository_def() + + +def test_get_partition_names(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_partitioned_op_job", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITIONS_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "job1", + } + }, + ) + assert result.data["pipelineOrError"]["name"] == "job1" + assert result.data["pipelineOrError"]["partitionKeysOrError"]["partitionKeys"] == [ + "1", + "2", + ] + + +def test_get_partition_names_asset_selection(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_differently_partitioned_assets", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITIONS_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "__ASSET_JOB", + }, + "selectedAssetKeys": [ + AssetKey("asset2").to_graphql_input(), + AssetKey("asset3").to_graphql_input(), + ], + }, + ) + assert result.data["pipelineOrError"]["name"] == "__ASSET_JOB" + assert result.data["pipelineOrError"]["partitionKeysOrError"]["partitionKeys"] == [ + "a", + "b", + ] + + +def test_get_partition_tags(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_partitioned_op_job", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITION_TAGS_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "job1", + }, + "partitionName": "1", + }, + ) + assert result.data["pipelineOrError"]["name"] == "job1" + result_partition = result.data["pipelineOrError"]["partition"] + assert result_partition["name"] == "1" + assert { + item["key"]: item["value"] for item in result_partition["tagsOrError"]["results"] + } == { + "dagster/partition": "1", + "dagster/partition_set": "job1_partition_set", + } + + +def test_get_partition_tags_asset_selection(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_differently_partitioned_assets", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITION_TAGS_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "__ASSET_JOB", + }, + "selectedAssetKeys": [ + AssetKey("asset2").to_graphql_input(), + AssetKey("asset3").to_graphql_input(), + ], + "partitionName": "b", + }, + ) + assert result.data["pipelineOrError"]["name"] == "__ASSET_JOB" + result_partition = result.data["pipelineOrError"]["partition"] + assert result_partition["name"] == "b" + assert { + item["key"]: item["value"] for item in result_partition["tagsOrError"]["results"] + } == {"dagster/partition": "b"} + + +def test_get_partition_config(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_partitioned_op_job", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITION_RUN_CONFIG_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "job1", + }, + "partitionName": "1", + }, + ) + assert result.data["pipelineOrError"]["name"] == "job1" + result_partition = result.data["pipelineOrError"]["partition"] + assert result_partition["name"] == "1" + assert ( + result_partition["runConfigOrError"]["yaml"] + == """ops:\n op1:\n config:\n p: '1'\n""" + ) + + +def test_get_partition_config_asset_selection(): + with instance_for_test() as instance: + with define_out_of_process_context( + __file__, "get_repo_with_differently_partitioned_assets", instance + ) as context: + result = execute_dagster_graphql( + context, + GET_PARTITION_RUN_CONFIG_QUERY, + variables={ + "selector": { + "repositoryLocationName": context.code_location_names[0], + "repositoryName": SINGLETON_REPOSITORY_NAME, + "pipelineName": "__ASSET_JOB", + }, + "selectedAssetKeys": [ + AssetKey("asset2").to_graphql_input(), + AssetKey("asset3").to_graphql_input(), + ], + "partitionName": "b", + }, + ) + assert result.data["pipelineOrError"]["name"] == "__ASSET_JOB" + result_partition = result.data["pipelineOrError"]["partition"] + assert result_partition["name"] == "b" + assert result_partition["runConfigOrError"]["yaml"] == "{}\n"