diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index ccbddcdf44f45..cfe36b4deaede 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3209,7 +3209,11 @@ type Query { repositorySelector: RepositorySelector! sensorStatus: InstigationStatus ): SensorsOrError! - instigationStateOrError(instigationSelector: InstigationSelector!): InstigationStateOrError! + instigationStateOrError( + instigationSelector: InstigationSelector + id: String + ): InstigationStateOrError! + instigationStatesOrError(repositoryID: String!): InstigationStatesOrError! partitionSetsOrError( repositorySelector: RepositorySelector! pipelineName: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 47dd4a7854668..9e3cdcfa8b12b 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -3679,6 +3679,7 @@ export type Query = { graphOrError: GraphOrError; instance: Instance; instigationStateOrError: InstigationStateOrError; + instigationStatesOrError: InstigationStatesOrError; isPipelineConfigValid: PipelineConfigValidationResult; locationStatusesOrError: WorkspaceLocationStatusEntriesOrError; logsForRun: EventConnectionOrError; @@ -3819,7 +3820,12 @@ export type QueryGraphOrErrorArgs = { }; export type QueryInstigationStateOrErrorArgs = { - instigationSelector: InstigationSelector; + id?: InputMaybe; + instigationSelector?: InputMaybe; +}; + +export type QueryInstigationStatesOrErrorArgs = { + repositoryID: Scalars['String']['input']; }; export type QueryIsPipelineConfigValidArgs = { @@ -11848,6 +11854,12 @@ export const buildQuery = ( : relationshipsToOmit.has('InstigationState') ? ({} as InstigationState) : buildInstigationState({}, relationshipsToOmit), + instigationStatesOrError: + overrides && overrides.hasOwnProperty('instigationStatesOrError') + ? overrides.instigationStatesOrError! + : relationshipsToOmit.has('InstigationStates') + ? ({} as InstigationStates) + : buildInstigationStates({}, relationshipsToOmit), isPipelineConfigValid: overrides && overrides.hasOwnProperty('isPipelineConfigValid') ? overrides.isPipelineConfigValid! diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_instigators.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_instigators.py index bc7b4c435507a..be4032a82aaa4 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_instigators.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_instigators.py @@ -4,6 +4,7 @@ from dagster._core.definitions.instigation_logger import get_instigation_log_records from dagster._core.definitions.selector import InstigatorSelector from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR +from dagster._core.remote_representation.external import CompoundID if TYPE_CHECKING: from dagster_graphql.schema.util import ResolveInfo @@ -15,8 +16,9 @@ ) -def get_instigator_state_or_error( - graphene_info: "ResolveInfo", selector: InstigatorSelector +def get_instigator_state_by_selector( + graphene_info: "ResolveInfo", + selector: InstigatorSelector, ) -> Union["GrapheneInstigationState", "GrapheneInstigationStateNotFoundError"]: from ..schema.instigation import GrapheneInstigationState, GrapheneInstigationStateNotFoundError @@ -44,6 +46,39 @@ def get_instigator_state_or_error( return GrapheneInstigationState(current_state) +def get_instigation_state_by_id( + graphene_info: "ResolveInfo", + instigator_id: CompoundID, +): + from ..schema.instigation import GrapheneInstigationState, GrapheneInstigationStateNotFoundError + + state = graphene_info.context.instance.get_instigator_state( + origin_id=instigator_id.external_origin_id, + selector_id=instigator_id.selector_id, + ) + if state is None: + return GrapheneInstigationStateNotFoundError(instigator_id.to_string()) + + return GrapheneInstigationState(state) + + +def get_instigation_states_by_repository_id( + graphene_info: "ResolveInfo", + repository_id: CompoundID, +): + from dagster_graphql.schema.instigation import ( + GrapheneInstigationState, + GrapheneInstigationStates, + ) + + states = graphene_info.context.instance.all_instigator_state( + repository_origin_id=repository_id.external_origin_id, + repository_selector_id=repository_id.selector_id, + ) + + return GrapheneInstigationStates(results=[GrapheneInstigationState(state) for state in states]) + + def get_tick_log_events(graphene_info: "ResolveInfo", tick) -> "GrapheneInstigationEventConnection": from ..schema.instigation import GrapheneInstigationEvent, GrapheneInstigationEventConnection from ..schema.logs.log_level import GrapheneLogLevel diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/external.py b/python_modules/dagster-graphql/dagster_graphql/schema/external.py index 0e501dbe03b39..8a84fddb10ecf 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/external.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/external.py @@ -286,8 +286,8 @@ def __init__( ) super().__init__(name=repository.name) - def resolve_id(self, _graphene_info: ResolveInfo): - return self._repository.get_external_origin_id() + def resolve_id(self, _graphene_info: ResolveInfo) -> str: + return self._repository.get_compound_id().to_string() def resolve_origin(self, _graphene_info: ResolveInfo): origin = self._repository.get_external_origin() diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py index 0bd755e303cab..bf329a22c11f4 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py @@ -727,10 +727,10 @@ class Meta: name = graphene.NonNull(graphene.String) - def __init__(self, name): + def __init__(self, target): super().__init__() - self.name = check.str_param(name, "name") - self.message = f"Could not find `{name}` in the currently loaded repository." + self.name = check.str_param(target, "target") + self.message = f"Could not find instigation state for `{target}`" class GrapheneInstigationStateOrError(graphene.Union): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index d3b2d6e471cad..54cfa6c2b93f2 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -17,6 +17,7 @@ from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.backfill import BulkActionStatus from dagster._core.nux import get_has_seen_nux +from dagster._core.remote_representation.external import CompoundID from dagster._core.scheduler.instigation import InstigatorStatus, InstigatorType from dagster._core.workspace.permissions import Permissions @@ -60,7 +61,11 @@ get_assets, ) from ...implementation.fetch_backfills import get_backfill, get_backfills -from ...implementation.fetch_instigators import get_instigator_state_or_error +from ...implementation.fetch_instigators import ( + get_instigation_state_by_id, + get_instigation_states_by_repository_id, + get_instigator_state_by_selector, +) from ...implementation.fetch_partition_sets import get_partition_set, get_partition_sets_or_error from ...implementation.fetch_pipelines import ( get_job_or_error, @@ -134,6 +139,7 @@ from ..instance import GrapheneInstance from ..instigation import ( GrapheneInstigationStateOrError, + GrapheneInstigationStatesOrError, GrapheneInstigationStatus, GrapheneInstigationTick, GrapheneInstigationTickStatus, @@ -283,13 +289,22 @@ class Meta: instigationStateOrError = graphene.Field( graphene.NonNull(GrapheneInstigationStateOrError), - instigationSelector=graphene.NonNull(GrapheneInstigationSelector), + instigationSelector=graphene.Argument(GrapheneInstigationSelector), + id=graphene.Argument(graphene.String), description=( "Retrieve the state for a schedule or sensor by its location name, repository name, and" " schedule/sensor name." ), ) + instigationStatesOrError = graphene.Field( + graphene.NonNull(GrapheneInstigationStatesOrError), + repositoryID=graphene.NonNull(graphene.String), + description=( + "Retrieve the state for a group of instigators (schedule/sensor) by their containing repository id." + ), + ) + partitionSetsOrError = graphene.Field( graphene.NonNull(GraphenePartitionSetsOrError), repositorySelector=graphene.NonNull(GrapheneRepositorySelector), @@ -708,10 +723,36 @@ def resolve_sensorsOrError( @capture_error def resolve_instigationStateOrError( - self, graphene_info: ResolveInfo, instigationSelector: GrapheneInstigationSelector + self, + graphene_info: ResolveInfo, + *, + instigationSelector: Optional[GrapheneInstigationSelector] = None, + id: Optional[str] = None, + ): + if id: + return get_instigation_state_by_id( + graphene_info, + CompoundID.from_string(id), + ) + elif instigationSelector: + return get_instigator_state_by_selector( + graphene_info, + InstigatorSelector.from_graphql_input(instigationSelector), + ) + else: + raise DagsterInvariantViolationError( + "Must pass either id or instigationSelector (but not both)." + ) + + @capture_error + def resolve_instigationStatesOrError( + self, + graphene_info: ResolveInfo, + repositoryID: str, ): - return get_instigator_state_or_error( - graphene_info, InstigatorSelector.from_graphql_input(instigationSelector) + return get_instigation_states_by_repository_id( + graphene_info, + CompoundID.from_string(repositoryID), ) @capture_error diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py b/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py index c500d27f1c933..8b97413940f52 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py @@ -91,8 +91,8 @@ def __init__( description=external_schedule.description, ) - def resolve_id(self, _graphene_info: ResolveInfo): - return self._external_schedule.get_external_origin_id() + def resolve_id(self, _graphene_info: ResolveInfo) -> str: + return self._external_schedule.get_compound_id().to_string() def resolve_defaultStatus(self, _graphene_info: ResolveInfo): default_schedule_status = self._external_schedule.default_status diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py b/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py index 7dfc0530666cd..8a29578839c16 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py @@ -125,8 +125,8 @@ def __init__( else None, ) - def resolve_id(self, _): - return self._external_sensor.get_external_origin_id() + def resolve_id(self, _) -> str: + return self._external_sensor.get_compound_id().to_string() def resolve_defaultStatus(self, _graphene_info: ResolveInfo): default_sensor_status = self._external_sensor.default_status diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_instigation.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_instigation.py index ff77faf8fae93..04eb72b3334cc 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_instigation.py @@ -10,8 +10,8 @@ from .graphql_context_test_suite import NonLaunchableGraphQLContextTestMatrix INSTIGATION_QUERY = """ -query JobQuery($instigationSelector: InstigationSelector!) { - instigationStateOrError(instigationSelector: $instigationSelector) { +query JobQuery($instigationSelector: InstigationSelector $id: String) { + instigationStateOrError(instigationSelector: $instigationSelector id: $id) { __typename ... on PythonError { message @@ -27,6 +27,24 @@ } """ +INSTIGATION_STATES_QUERY = """ +query InstigationStatesQuery($id: String!) { + instigationStatesOrError(repositoryID: $id) { + __typename + ... on PythonError { + message + stack + } + ... on InstigationStates { + results { + name + instigationType + } + } + } +} +""" + def _create_sensor_tick(graphql_context): logger = get_default_daemon_logger("SensorDaemon") @@ -44,7 +62,7 @@ def _create_sensor_tick(graphql_context): class TestNextTickRepository(NonLaunchableGraphQLContextTestMatrix): - def test_schedule_next_tick(self, graphql_context): + def test_schedule_next_tick(self, graphql_context) -> None: repository_selector = infer_repository_selector(graphql_context) external_repository = graphql_context.get_code_location( repository_selector["repositoryLocationName"] @@ -52,10 +70,23 @@ def test_schedule_next_tick(self, graphql_context): schedule_name = "no_config_job_hourly_schedule" external_schedule = external_repository.get_external_schedule(schedule_name) - selector = infer_instigation_selector(graphql_context, schedule_name) # need to be running in order to generate a future tick graphql_context.instance.start_schedule(external_schedule) + result = execute_dagster_graphql( + graphql_context, + INSTIGATION_QUERY, + variables={"id": external_schedule.get_compound_id().to_string()}, + ) + + assert result.data + assert result.data["instigationStateOrError"]["__typename"] == "InstigationState" + next_tick = result.data["instigationStateOrError"]["nextTick"] + assert next_tick + + # ensure legacy selector based impl works until removal + + selector = infer_instigation_selector(graphql_context, schedule_name) result = execute_dagster_graphql( graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector} ) @@ -80,11 +111,52 @@ def test_sensor_next_tick(self, graphql_context): graphql_context.instance.start_sensor(external_sensor) _create_sensor_tick(graphql_context) + result = execute_dagster_graphql( + graphql_context, + INSTIGATION_QUERY, + variables={"id": external_sensor.get_compound_id().to_string()}, + ) + + assert result.data + assert ( + result.data["instigationStateOrError"]["__typename"] == "InstigationState" + ), result.data["instigationStateOrError"] + next_tick = result.data["instigationStateOrError"]["nextTick"] + assert next_tick + + # ensure legacy selector based lookup continues to work until removed + result = execute_dagster_graphql( graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector} ) assert result.data - assert result.data["instigationStateOrError"]["__typename"] == "InstigationState" + assert ( + result.data["instigationStateOrError"]["__typename"] == "InstigationState" + ), result.data["instigationStateOrError"] next_tick = result.data["instigationStateOrError"]["nextTick"] assert next_tick + + def test_instigation_states(self, graphql_context) -> None: + repository_selector = infer_repository_selector(graphql_context) + external_repository = graphql_context.get_code_location( + repository_selector["repositoryLocationName"] + ).get_repository(repository_selector["repositoryName"]) + + schedule_name = "no_config_job_hourly_schedule" + external_schedule = external_repository.get_external_schedule(schedule_name) + graphql_context.instance.start_schedule(external_schedule) + + result = execute_dagster_graphql( + graphql_context, + INSTIGATION_STATES_QUERY, + variables={"id": external_repository.get_compound_id().to_string()}, + ) + + assert result.data + assert ( + result.data["instigationStatesOrError"]["__typename"] == "InstigationStates" + ), result.data["instigationStatesOrError"] + + results = result.data["instigationStatesOrError"]["results"] + assert results == [{"name": schedule_name, "instigationType": "SCHEDULE"}] diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index dca0b8b2c5288..7421e5c9728db 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -36,6 +36,7 @@ DefaultSensorStatus, SensorType, ) +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle from dagster._core.instance import DagsterInstance from dagster._core.origin import JobPythonOrigin, RepositoryPythonOrigin @@ -48,6 +49,7 @@ from dagster._core.snap import ExecutionPlanSnapshot from dagster._core.snap.job_snapshot import JobSnapshot from dagster._core.utils import toposort +from dagster._model.decorator import dagster_model from dagster._serdes import create_snapshot_id from dagster._utils.cached_method import cached_method from dagster._utils.schedules import schedule_execution_time_iterator @@ -80,6 +82,30 @@ from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap +_DELIMITER = "::" + + +@dagster_model +class CompoundID: + """Compound ID object for the two id schemes that state is recorded in the database against.""" + + external_origin_id: str + selector_id: str + + def to_string(self) -> str: + return f"{self.external_origin_id}{_DELIMITER}{self.selector_id}" + + @staticmethod + def from_string(serialized: str): + parts = serialized.split(_DELIMITER) + if len(parts) != 2: + raise DagsterInvariantViolationError(f"Invalid serialized InstigatorID: {serialized}") + + return CompoundID( + external_origin_id=parts[0], + selector_id=parts[1], + ) + class ExternalRepository: """ExternalRepository is a object that represents a loaded repository definition that @@ -333,6 +359,12 @@ def selector_id(self) -> str: RepositorySelector(self._handle.location_name, self._handle.repository_name) ) + def get_compound_id(self) -> CompoundID: + return CompoundID( + external_origin_id=self.get_external_origin_id(), + selector_id=self.selector_id, + ) + def get_external_origin(self) -> RemoteRepositoryOrigin: return self.handle.get_external_origin() @@ -784,6 +816,12 @@ def schedule_selector(self) -> ScheduleSelector: def selector_id(self) -> str: return create_snapshot_id(self.selector) + def get_compound_id(self) -> CompoundID: + return CompoundID( + external_origin_id=self.get_external_origin_id(), + selector_id=self.selector_id, + ) + @property def default_status(self) -> DefaultScheduleStatus: return self._external_schedule_data.default_status or DefaultScheduleStatus.STOPPED @@ -928,6 +966,12 @@ def sensor_selector(self) -> SensorSelector: def selector_id(self) -> str: return create_snapshot_id(self.selector) + def get_compound_id(self) -> CompoundID: + return CompoundID( + external_origin_id=self.get_external_origin_id(), + selector_id=self.selector_id, + ) + @property def sensor_type(self) -> SensorType: return self._external_sensor_data.sensor_type or SensorType.UNKNOWN diff --git a/python_modules/dagster/dagster/_core/storage/schedules/sql_schedule_storage.py b/python_modules/dagster/dagster/_core/storage/schedules/sql_schedule_storage.py index bf9fcbdb2559a..939986ce68aae 100644 --- a/python_modules/dagster/dagster/_core/storage/schedules/sql_schedule_storage.py +++ b/python_modules/dagster/dagster/_core/storage/schedules/sql_schedule_storage.py @@ -110,7 +110,6 @@ def all_instigator_state( query = query.where( JobTable.c.status.in_([status.value for status in instigator_statuses]) ) - rows = self.execute(query) return self._deserialize_rows(rows, InstigatorState)