diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py index 29d3eb286355d..1af8634b14c98 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING, Optional, Sequence import dagster._check as check -from dagster import AssetKey +from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord from dagster_graphql.schema.asset_condition_evaluations import ( @@ -13,7 +13,7 @@ from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, ) -from dagster_graphql.schema.inputs import GrapheneAssetKeyInput +from dagster_graphql.schema.inputs import GrapheneAssetKeyInput, GrapheneEntityKeyInput if TYPE_CHECKING: from dagster_graphql.schema.util import ResolveInfo @@ -67,20 +67,27 @@ def fetch_asset_condition_evaluation_record_for_partition( ) +def entity_key_from_graphql_input(graphene_input: GrapheneEntityKeyInput) -> EntityKey: + if "path" in graphene_input: + return AssetKey.from_graphql_input(graphene_input) + else: + return AssetCheckKey.from_graphql_input(graphene_input) + + def fetch_true_partitions_for_evaluation_node( graphene_info: "ResolveInfo", - graphene_asset_key: GrapheneAssetKeyInput, + graphene_asset_key: GrapheneEntityKeyInput, evaluation_id: int, node_unique_id: str, ) -> Sequence[str]: - asset_key = AssetKey.from_graphql_input(graphene_asset_key) + key = entity_key_from_graphql_input(graphene_asset_key) schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage) record = next( iter( schedule_storage.get_auto_materialize_asset_evaluations( # there is no method to get a specific evaluation by id, so instead get the first # evaluation before evaluation_id + 1 - key=asset_key, + key=key, cursor=evaluation_id + 1, limit=1, ) @@ -104,7 +111,7 @@ def fetch_true_partitions_for_evaluation_node( def fetch_asset_condition_evaluation_records_for_asset_key( graphene_info: "ResolveInfo", - graphene_asset_key: GrapheneAssetKeyInput, + graphene_asset_key: GrapheneEntityKeyInput, limit: int, cursor: Optional[str], ) -> GrapheneAssetConditionEvaluationRecordsOrError: @@ -113,13 +120,13 @@ def fetch_asset_condition_evaluation_records_for_asset_key( if migration_error: return migration_error - asset_key = AssetKey.from_graphql_input(graphene_asset_key) + key = entity_key_from_graphql_input(graphene_asset_key) schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage) return _get_graphene_records_from_evaluations( graphene_info, schedule_storage.get_auto_materialize_asset_evaluations( - key=asset_key, + key=key, limit=limit, cursor=int(cursor) if cursor else None, ), diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py index ca864bc4fa98b..8e00e6da8a708 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py @@ -9,6 +9,9 @@ ) from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.declarative_automation.serialized_objects import ( + AutomationConditionSnapshot, +) from dagster._core.definitions.remote_asset_graph import RemoteAssetCheckNode from dagster._core.events import DagsterEventType from dagster._core.storage.asset_check_execution_record import ( @@ -18,6 +21,8 @@ from dagster_graphql.implementation.events import iterate_metadata_entries from dagster_graphql.schema.asset_key import GrapheneAssetKey +from dagster_graphql.schema.auto_materialize_policy import GrapheneAutoMaterializePolicy +from dagster_graphql.schema.automation_condition import GrapheneAutomationCondition from dagster_graphql.schema.errors import GrapheneError from dagster_graphql.schema.metadata import GrapheneMetadataEntry from dagster_graphql.schema.util import ResolveInfo, non_null_list @@ -185,6 +190,22 @@ def resolve_additionalAssetKeys(self, _) -> Sequence[GrapheneAssetKey]: for asset_key in self._asset_check.additional_asset_keys ] + def resolve_automationCondition( + self, _graphene_info: ResolveInfo + ) -> Optional[GrapheneAutoMaterializePolicy]: + automation_condition = ( + self._asset_check.automation_condition_snapshot + or self._asset_check.automation_condition + ) + if automation_condition: + return GrapheneAutomationCondition( + # we only store one of automation_condition or automation_condition_snapshot + automation_condition + if isinstance(automation_condition, AutomationConditionSnapshot) + else automation_condition.get_snapshot() + ) + return None + class GrapheneAssetChecks(graphene.ObjectType): checks = non_null_list(GrapheneAssetCheck) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index 2578b7cb11992..85c5db09fb4a9 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -29,6 +29,12 @@ class Meta: name = "AssetCheckHandleInput" +class GrapheneEntityKeyInput(graphene.Union): + class Meta: + types = (GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput) + name = "EntityKeyInput" + + class GrapheneExecutionTag(graphene.InputObjectType): key = graphene.NonNull(graphene.String) value = graphene.NonNull(graphene.String) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 48a244bee6457..abc709da083ff 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -132,6 +132,7 @@ GrapheneAssetGroupSelector, GrapheneAssetKeyInput, GrapheneBulkActionsFilter, + GrapheneEntityKeyInput, GrapheneGraphSelector, GrapheneInstigationSelector, GraphenePipelineSelector, @@ -1269,7 +1270,7 @@ def resolve_autoMaterializeEvaluationsForEvaluationId( def resolve_assetConditionEvaluationForPartition( self, graphene_info: ResolveInfo, - assetKey: GrapheneAssetKeyInput, + assetKey: GrapheneEntityKeyInput, evaluationId: str, partition: str, ): @@ -1283,7 +1284,7 @@ def resolve_assetConditionEvaluationForPartition( def resolve_assetConditionEvaluationRecordsOrError( self, graphene_info: ResolveInfo, - assetKey: GrapheneAssetKeyInput, + assetKey: GrapheneEntityKeyInput, limit: int, cursor: Optional[str] = None, ):