Skip to content

Commit

Permalink
[graphql] instigation state by id (#22543)
Browse files Browse the repository at this point in the history
* add `CompoundID` to wrap external origin + selector id components 
* change the `Schedule` `Sensor` and `Repository` id fields to return
serialized CompoundIDs
* update the `instigationStateOrError` to be able to fetch by id
* add `instigationStatesOrError` and support fetching all instigation
states by their containing repository ID

## How I Tested These Changes

Added tests
  • Loading branch information
alangenfeld authored Jul 1, 2024
1 parent 64828a3 commit 18ed7c8
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 24 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dagster._core.definitions.instigation_logger import get_instigation_log_records
from dagster._core.definitions.selector import InstigatorSelector
from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR
from dagster._core.remote_representation.external import CompoundID

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand All @@ -15,8 +16,9 @@
)


def get_instigator_state_or_error(
graphene_info: "ResolveInfo", selector: InstigatorSelector
def get_instigator_state_by_selector(
graphene_info: "ResolveInfo",
selector: InstigatorSelector,
) -> Union["GrapheneInstigationState", "GrapheneInstigationStateNotFoundError"]:
from ..schema.instigation import GrapheneInstigationState, GrapheneInstigationStateNotFoundError

Expand Down Expand Up @@ -44,6 +46,39 @@ def get_instigator_state_or_error(
return GrapheneInstigationState(current_state)


def get_instigation_state_by_id(
graphene_info: "ResolveInfo",
instigator_id: CompoundID,
):
from ..schema.instigation import GrapheneInstigationState, GrapheneInstigationStateNotFoundError

state = graphene_info.context.instance.get_instigator_state(
origin_id=instigator_id.external_origin_id,
selector_id=instigator_id.selector_id,
)
if state is None:
return GrapheneInstigationStateNotFoundError(instigator_id.to_string())

return GrapheneInstigationState(state)


def get_instigation_states_by_repository_id(
graphene_info: "ResolveInfo",
repository_id: CompoundID,
):
from dagster_graphql.schema.instigation import (
GrapheneInstigationState,
GrapheneInstigationStates,
)

states = graphene_info.context.instance.all_instigator_state(
repository_origin_id=repository_id.external_origin_id,
repository_selector_id=repository_id.selector_id,
)

return GrapheneInstigationStates(results=[GrapheneInstigationState(state) for state in states])


def get_tick_log_events(graphene_info: "ResolveInfo", tick) -> "GrapheneInstigationEventConnection":
from ..schema.instigation import GrapheneInstigationEvent, GrapheneInstigationEventConnection
from ..schema.logs.log_level import GrapheneLogLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ def __init__(
)
super().__init__(name=repository.name)

def resolve_id(self, _graphene_info: ResolveInfo):
return self._repository.get_external_origin_id()
def resolve_id(self, _graphene_info: ResolveInfo) -> str:
return self._repository.get_compound_id().to_string()

def resolve_origin(self, _graphene_info: ResolveInfo):
origin = self._repository.get_external_origin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,10 @@ class Meta:

name = graphene.NonNull(graphene.String)

def __init__(self, name):
def __init__(self, target):
super().__init__()
self.name = check.str_param(name, "name")
self.message = f"Could not find `{name}` in the currently loaded repository."
self.name = check.str_param(target, "target")
self.message = f"Could not find instigation state for `{target}`"


class GrapheneInstigationStateOrError(graphene.Union):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.nux import get_has_seen_nux
from dagster._core.remote_representation.external import CompoundID
from dagster._core.scheduler.instigation import InstigatorStatus, InstigatorType
from dagster._core.workspace.permissions import Permissions

Expand Down Expand Up @@ -60,7 +61,11 @@
get_assets,
)
from ...implementation.fetch_backfills import get_backfill, get_backfills
from ...implementation.fetch_instigators import get_instigator_state_or_error
from ...implementation.fetch_instigators import (
get_instigation_state_by_id,
get_instigation_states_by_repository_id,
get_instigator_state_by_selector,
)
from ...implementation.fetch_partition_sets import get_partition_set, get_partition_sets_or_error
from ...implementation.fetch_pipelines import (
get_job_or_error,
Expand Down Expand Up @@ -134,6 +139,7 @@
from ..instance import GrapheneInstance
from ..instigation import (
GrapheneInstigationStateOrError,
GrapheneInstigationStatesOrError,
GrapheneInstigationStatus,
GrapheneInstigationTick,
GrapheneInstigationTickStatus,
Expand Down Expand Up @@ -283,13 +289,22 @@ class Meta:

instigationStateOrError = graphene.Field(
graphene.NonNull(GrapheneInstigationStateOrError),
instigationSelector=graphene.NonNull(GrapheneInstigationSelector),
instigationSelector=graphene.Argument(GrapheneInstigationSelector),
id=graphene.Argument(graphene.String),
description=(
"Retrieve the state for a schedule or sensor by its location name, repository name, and"
" schedule/sensor name."
),
)

instigationStatesOrError = graphene.Field(
graphene.NonNull(GrapheneInstigationStatesOrError),
repositoryID=graphene.NonNull(graphene.String),
description=(
"Retrieve the state for a group of instigators (schedule/sensor) by their containing repository id."
),
)

partitionSetsOrError = graphene.Field(
graphene.NonNull(GraphenePartitionSetsOrError),
repositorySelector=graphene.NonNull(GrapheneRepositorySelector),
Expand Down Expand Up @@ -708,10 +723,36 @@ def resolve_sensorsOrError(

@capture_error
def resolve_instigationStateOrError(
self, graphene_info: ResolveInfo, instigationSelector: GrapheneInstigationSelector
self,
graphene_info: ResolveInfo,
*,
instigationSelector: Optional[GrapheneInstigationSelector] = None,
id: Optional[str] = None,
):
if id:
return get_instigation_state_by_id(
graphene_info,
CompoundID.from_string(id),
)
elif instigationSelector:
return get_instigator_state_by_selector(
graphene_info,
InstigatorSelector.from_graphql_input(instigationSelector),
)
else:
raise DagsterInvariantViolationError(
"Must pass either id or instigationSelector (but not both)."
)

@capture_error
def resolve_instigationStatesOrError(
self,
graphene_info: ResolveInfo,
repositoryID: str,
):
return get_instigator_state_or_error(
graphene_info, InstigatorSelector.from_graphql_input(instigationSelector)
return get_instigation_states_by_repository_id(
graphene_info,
CompoundID.from_string(repositoryID),
)

@capture_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def __init__(
description=external_schedule.description,
)

def resolve_id(self, _graphene_info: ResolveInfo):
return self._external_schedule.get_external_origin_id()
def resolve_id(self, _graphene_info: ResolveInfo) -> str:
return self._external_schedule.get_compound_id().to_string()

def resolve_defaultStatus(self, _graphene_info: ResolveInfo):
default_schedule_status = self._external_schedule.default_status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def __init__(
else None,
)

def resolve_id(self, _):
return self._external_sensor.get_external_origin_id()
def resolve_id(self, _) -> str:
return self._external_sensor.get_compound_id().to_string()

def resolve_defaultStatus(self, _graphene_info: ResolveInfo):
default_sensor_status = self._external_sensor.default_status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from .graphql_context_test_suite import NonLaunchableGraphQLContextTestMatrix

INSTIGATION_QUERY = """
query JobQuery($instigationSelector: InstigationSelector!) {
instigationStateOrError(instigationSelector: $instigationSelector) {
query JobQuery($instigationSelector: InstigationSelector $id: String) {
instigationStateOrError(instigationSelector: $instigationSelector id: $id) {
__typename
... on PythonError {
message
Expand All @@ -27,6 +27,24 @@
}
"""

INSTIGATION_STATES_QUERY = """
query InstigationStatesQuery($id: String!) {
instigationStatesOrError(repositoryID: $id) {
__typename
... on PythonError {
message
stack
}
... on InstigationStates {
results {
name
instigationType
}
}
}
}
"""


def _create_sensor_tick(graphql_context):
logger = get_default_daemon_logger("SensorDaemon")
Expand All @@ -44,18 +62,31 @@ def _create_sensor_tick(graphql_context):


class TestNextTickRepository(NonLaunchableGraphQLContextTestMatrix):
def test_schedule_next_tick(self, graphql_context):
def test_schedule_next_tick(self, graphql_context) -> None:
repository_selector = infer_repository_selector(graphql_context)
external_repository = graphql_context.get_code_location(
repository_selector["repositoryLocationName"]
).get_repository(repository_selector["repositoryName"])

schedule_name = "no_config_job_hourly_schedule"
external_schedule = external_repository.get_external_schedule(schedule_name)
selector = infer_instigation_selector(graphql_context, schedule_name)

# need to be running in order to generate a future tick
graphql_context.instance.start_schedule(external_schedule)
result = execute_dagster_graphql(
graphql_context,
INSTIGATION_QUERY,
variables={"id": external_schedule.get_compound_id().to_string()},
)

assert result.data
assert result.data["instigationStateOrError"]["__typename"] == "InstigationState"
next_tick = result.data["instigationStateOrError"]["nextTick"]
assert next_tick

# ensure legacy selector based impl works until removal

selector = infer_instigation_selector(graphql_context, schedule_name)
result = execute_dagster_graphql(
graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector}
)
Expand All @@ -80,11 +111,52 @@ def test_sensor_next_tick(self, graphql_context):
graphql_context.instance.start_sensor(external_sensor)
_create_sensor_tick(graphql_context)

result = execute_dagster_graphql(
graphql_context,
INSTIGATION_QUERY,
variables={"id": external_sensor.get_compound_id().to_string()},
)

assert result.data
assert (
result.data["instigationStateOrError"]["__typename"] == "InstigationState"
), result.data["instigationStateOrError"]
next_tick = result.data["instigationStateOrError"]["nextTick"]
assert next_tick

# ensure legacy selector based lookup continues to work until removed

result = execute_dagster_graphql(
graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector}
)

assert result.data
assert result.data["instigationStateOrError"]["__typename"] == "InstigationState"
assert (
result.data["instigationStateOrError"]["__typename"] == "InstigationState"
), result.data["instigationStateOrError"]
next_tick = result.data["instigationStateOrError"]["nextTick"]
assert next_tick

def test_instigation_states(self, graphql_context) -> None:
repository_selector = infer_repository_selector(graphql_context)
external_repository = graphql_context.get_code_location(
repository_selector["repositoryLocationName"]
).get_repository(repository_selector["repositoryName"])

schedule_name = "no_config_job_hourly_schedule"
external_schedule = external_repository.get_external_schedule(schedule_name)
graphql_context.instance.start_schedule(external_schedule)

result = execute_dagster_graphql(
graphql_context,
INSTIGATION_STATES_QUERY,
variables={"id": external_repository.get_compound_id().to_string()},
)

assert result.data
assert (
result.data["instigationStatesOrError"]["__typename"] == "InstigationStates"
), result.data["instigationStatesOrError"]

results = result.data["instigationStatesOrError"]["results"]
assert results == [{"name": schedule_name, "instigationType": "SCHEDULE"}]
Loading

1 comment on commit 18ed7c8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-3lbzhdmr3-elementl.vercel.app

Built with commit 18ed7c8.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.