Skip to content

Commit

Permalink
Add graphql endpoints for querying asset check automation history
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Nov 27, 2024
1 parent 1dbb7bf commit d47d9b3
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional, Sequence

import dagster._check as check
from dagster import AssetKey
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.schema.asset_condition_evaluations import (
Expand All @@ -13,7 +13,7 @@
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
)
from dagster_graphql.schema.inputs import GrapheneAssetKeyInput
from dagster_graphql.schema.inputs import GrapheneAssetKeyInput, GrapheneEntityKeyInput

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand Down Expand Up @@ -67,20 +67,27 @@ def fetch_asset_condition_evaluation_record_for_partition(
)


def entity_key_from_graphql_input(graphene_input: GrapheneEntityKeyInput) -> EntityKey:
if "path" in graphene_input:
return AssetKey.from_graphql_input(graphene_input)
else:
return AssetCheckKey.from_graphql_input(graphene_input)


def fetch_true_partitions_for_evaluation_node(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_asset_key: GrapheneEntityKeyInput,
evaluation_id: int,
node_unique_id: str,
) -> Sequence[str]:
asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_asset_key)
schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
record = next(
iter(
schedule_storage.get_auto_materialize_asset_evaluations(
# there is no method to get a specific evaluation by id, so instead get the first
# evaluation before evaluation_id + 1
key=asset_key,
key=key,
cursor=evaluation_id + 1,
limit=1,
)
Expand All @@ -104,7 +111,7 @@ def fetch_true_partitions_for_evaluation_node(

def fetch_asset_condition_evaluation_records_for_asset_key(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_asset_key: GrapheneEntityKeyInput,
limit: int,
cursor: Optional[str],
) -> GrapheneAssetConditionEvaluationRecordsOrError:
Expand All @@ -113,13 +120,13 @@ def fetch_asset_condition_evaluation_records_for_asset_key(
if migration_error:
return migration_error

asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_asset_key)

schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
return _get_graphene_records_from_evaluations(
graphene_info,
schedule_storage.get_auto_materialize_asset_evaluations(
key=asset_key,
key=key,
limit=limit,
cursor=int(cursor) if cursor else None,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionSnapshot,
)
from dagster._core.definitions.remote_asset_graph import RemoteAssetCheckNode
from dagster._core.events import DagsterEventType
from dagster._core.storage.asset_check_execution_record import (
Expand All @@ -18,6 +21,8 @@

from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.auto_materialize_policy import GrapheneAutoMaterializePolicy
from dagster_graphql.schema.automation_condition import GrapheneAutomationCondition
from dagster_graphql.schema.errors import GrapheneError
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.util import ResolveInfo, non_null_list
Expand Down Expand Up @@ -185,6 +190,22 @@ def resolve_additionalAssetKeys(self, _) -> Sequence[GrapheneAssetKey]:
for asset_key in self._asset_check.additional_asset_keys
]

def resolve_automationCondition(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneAutoMaterializePolicy]:
automation_condition = (
self._asset_check.automation_condition_snapshot
or self._asset_check.automation_condition
)
if automation_condition:
return GrapheneAutomationCondition(
# we only store one of automation_condition or automation_condition_snapshot
automation_condition
if isinstance(automation_condition, AutomationConditionSnapshot)
else automation_condition.get_snapshot()
)
return None


class GrapheneAssetChecks(graphene.ObjectType):
checks = non_null_list(GrapheneAssetCheck)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class Meta:
name = "AssetCheckHandleInput"


class GrapheneEntityKeyInput(graphene.Union):
class Meta:
types = (GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput)
name = "EntityKeyInput"


class GrapheneExecutionTag(graphene.InputObjectType):
key = graphene.NonNull(graphene.String)
value = graphene.NonNull(graphene.String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
GrapheneAssetGroupSelector,
GrapheneAssetKeyInput,
GrapheneBulkActionsFilter,
GrapheneEntityKeyInput,
GrapheneGraphSelector,
GrapheneInstigationSelector,
GraphenePipelineSelector,
Expand Down Expand Up @@ -1269,7 +1270,7 @@ def resolve_autoMaterializeEvaluationsForEvaluationId(
def resolve_assetConditionEvaluationForPartition(
self,
graphene_info: ResolveInfo,
assetKey: GrapheneAssetKeyInput,
assetKey: GrapheneEntityKeyInput,
evaluationId: str,
partition: str,
):
Expand All @@ -1283,7 +1284,7 @@ def resolve_assetConditionEvaluationForPartition(
def resolve_assetConditionEvaluationRecordsOrError(
self,
graphene_info: ResolveInfo,
assetKey: GrapheneAssetKeyInput,
assetKey: GrapheneEntityKeyInput,
limit: int,
cursor: Optional[str] = None,
):
Expand Down

0 comments on commit d47d9b3

Please sign in to comment.