Skip to content

Commit

Permalink
[single-implicit-asset-job] add partitionKeysOrError and `partition…
Browse files Browse the repository at this point in the history
…` to `GrapheneJob` (#23494)

## Summary & Motivation

This enables the frontend to query for partition information with
respect to an asset selection within a job. This enables a world where
jobs can contain assets with different `PartitionsDefinition`s.

It's used by @bengotow 's `partition-set-to-job/graphql-ui` branch.

Sits atop #23491.

## How I Tested These Changes
  • Loading branch information
sryza authored Aug 20, 2024
1 parent ae95d8f commit 3bd53c0
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 55 deletions.
29 changes: 25 additions & 4 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from dagster_graphql.schema.util import ResolveInfo

from .utils import apply_cursor_limit_reverse

if TYPE_CHECKING:
from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError
from dagster_graphql.schema.partition_sets import (
Expand Down Expand Up @@ -175,7 +177,7 @@ def get_partitions(
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.inst_param(partition_set, "partition_set", ExternalPartitionSet)

partition_names = _apply_cursor_limit_reverse(partition_names, cursor, limit, reverse)
partition_names = apply_cursor_limit_reverse(partition_names, cursor, limit, reverse)

return GraphenePartitions(
results=[
Expand All @@ -189,30 +191,6 @@ def get_partitions(
)


def _apply_cursor_limit_reverse(
items: Sequence[str], cursor: Optional[str], limit: Optional[int], reverse: Optional[bool]
) -> Sequence[str]:
start = 0
end = len(items)
index = 0

if cursor:
index = next((idx for (idx, item) in enumerate(items) if item == cursor))

if reverse:
end = index
else:
start = index + 1

if limit:
if reverse:
start = end - limit
else:
end = start + limit

return items[max(start, 0) : end]


def get_partition_set_partition_statuses(
graphene_info: ResolveInfo,
external_partition_set: ExternalPartitionSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,5 +290,29 @@ def to_graphql_input(self) -> Mapping[str, Any]:
}


def apply_cursor_limit_reverse(
items: Sequence[str], cursor: Optional[str], limit: Optional[int], reverse: Optional[bool]
) -> Sequence[str]:
start = 0
end = len(items)
index = 0

if cursor:
index = next((idx for (idx, item) in enumerate(items) if item == cursor))

if reverse:
end = index
else:
start = index + 1

if limit:
if reverse:
start = end - limit
else:
end = start + limit

return items[max(start, 0) : end]


BackfillParams: TypeAlias = Mapping[str, Any]
AssetBackfillPreviewParams: TypeAlias = Mapping[str, Any]
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,12 @@

from .asset_key import GrapheneAssetKey
from .auto_materialize_policy import GrapheneAutoMaterializeRule
from .partition_keys import GraphenePartitionKeys, GraphenePartitionKeysOrError
from .util import non_null_list

GrapheneAutoMaterializeDecisionType = graphene.Enum.from_enum(AutoMaterializeDecisionType)


class GraphenePartitionKeys(graphene.ObjectType):
partitionKeys = non_null_list(graphene.String)

class Meta:
name = "PartitionKeys"


class GraphenePartitionSubsetDeserializationError(graphene.ObjectType):
message = graphene.NonNull(graphene.String)

class Meta:
interfaces = (GrapheneError,)
name = "PartitionSubsetDeserializationError"


class GraphenePartitionKeysOrError(graphene.Union):
class Meta:
types = (GraphenePartitionKeys, GraphenePartitionSubsetDeserializationError)
name = "PartitionKeysOrError"


class GrapheneTextRuleEvaluationData(graphene.ObjectType):
text = graphene.String()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import graphene
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.events import DagsterEventType
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._time import datetime_from_timestamp
Expand All @@ -15,6 +16,9 @@ class GrapheneAssetKeyInput(graphene.InputObjectType):
class Meta:
name = "AssetKeyInput"

def to_asset_key(self) -> AssetKey:
return AssetKey(self.path)


class GrapheneAssetCheckHandleInput(graphene.InputObjectType):
assetKey = graphene.NonNull(GrapheneAssetKeyInput)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import graphene

from dagster_graphql.schema.errors import GrapheneError

from .util import non_null_list


class GraphenePartitionKeys(graphene.ObjectType):
partitionKeys = non_null_list(graphene.String)

class Meta:
name = "PartitionKeys"


class GraphenePartitionSubsetDeserializationError(graphene.ObjectType):
message = graphene.NonNull(graphene.String)

class Meta:
interfaces = (GrapheneError,)
name = "PartitionSubsetDeserializationError"


class GraphenePartitionKeysOrError(graphene.Union):
class Meta:
types = (GraphenePartitionKeys, GraphenePartitionSubsetDeserializationError)
name = "PartitionKeysOrError"
Loading

1 comment on commit 3bd53c0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-ge3jz2v6g-elementl.vercel.app

Built with commit 3bd53c0.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.