Skip to content

Commit

Permalink
Add graphql endpoints for querying asset check automation history
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 18, 2024
1 parent dc690a6 commit 76f99d1
Show file tree
Hide file tree
Showing 27 changed files with 172 additions and 87 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster._core.storage.dagster_run import RunRecord

from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution

if TYPE_CHECKING:
from dagster_graphql.schema.pipelines.pipeline import GrapheneAssetCheckHandle
from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution
from dagster_graphql.schema.entity_key import GrapheneAssetCheckHandle
from dagster_graphql.schema.util import ResolveInfo


Expand All @@ -32,7 +31,9 @@ def fetch_asset_check_executions(
asset_check_key: AssetCheckKey,
limit: int,
cursor: Optional[str],
) -> List[GrapheneAssetCheckExecution]:
) -> List["GrapheneAssetCheckExecution"]:
from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution

check_records = loading_context.instance.event_log_storage.get_asset_check_execution_history(
check_key=asset_check_key,
limit=limit,
Expand All @@ -50,7 +51,7 @@ def fetch_asset_check_executions(
def get_asset_checks_for_run_id(
graphene_info: "ResolveInfo", run_id: str
) -> Sequence["GrapheneAssetCheckHandle"]:
from dagster_graphql.schema.pipelines.pipeline import GrapheneAssetCheckHandle
from dagster_graphql.schema.entity_key import GrapheneAssetCheckHandle

check.str_param(run_id, "run_id")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional, Sequence
from typing import TYPE_CHECKING, Optional, Sequence, Union

import dagster._check as check
from dagster import AssetKey
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.schema.asset_condition_evaluations import (
Expand All @@ -13,7 +13,7 @@
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
)
from dagster_graphql.schema.inputs import GrapheneAssetKeyInput
from dagster_graphql.schema.inputs import GrapheneAssetCheckHandleInput, GrapheneAssetKeyInput

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand Down Expand Up @@ -67,20 +67,29 @@ def fetch_asset_condition_evaluation_record_for_partition(
)


def entity_key_from_graphql_input(
graphene_input: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
) -> EntityKey:
if "path" in graphene_input:
return AssetKey.from_graphql_input(graphene_input)
else:
return AssetCheckKey.from_graphql_input(graphene_input)


def fetch_true_partitions_for_evaluation_node(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_entity_key: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
evaluation_id: int,
node_unique_id: str,
) -> Sequence[str]:
asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_entity_key)
schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
record = next(
iter(
schedule_storage.get_auto_materialize_asset_evaluations(
# there is no method to get a specific evaluation by id, so instead get the first
# evaluation before evaluation_id + 1
key=asset_key,
key=key,
cursor=evaluation_id + 1,
limit=1,
)
Expand All @@ -104,7 +113,7 @@ def fetch_true_partitions_for_evaluation_node(

def fetch_asset_condition_evaluation_records_for_asset_key(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_entity_key: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
limit: int,
cursor: Optional[str],
) -> GrapheneAssetConditionEvaluationRecordsOrError:
Expand All @@ -113,13 +122,13 @@ def fetch_asset_condition_evaluation_records_for_asset_key(
if migration_error:
return migration_error

asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_entity_key)

schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
return _get_graphene_records_from_evaluations(
graphene_info,
schedule_storage.get_auto_materialize_asset_evaluations(
key=asset_key,
key=key,
limit=limit,
cursor=int(cursor) if cursor else None,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@


def types():
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.backfill import (
GrapheneLaunchBackfillResult,
GrapheneLaunchBackfillSuccess,
)
from dagster_graphql.schema.config_type_or_error import GrapheneConfigTypeOrError
from dagster_graphql.schema.config_types import types as config_types
from dagster_graphql.schema.dagster_types import types as dagster_types_types
from dagster_graphql.schema.entity_key import GrapheneAssetKey
from dagster_graphql.schema.errors import types as errors_types
from dagster_graphql.schema.execution import types as execution_types
from dagster_graphql.schema.external import types as external_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
AssetCheckEvaluation,
AssetCheckEvaluationTargetMaterializationData,
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionSnapshot,
)
from dagster._core.definitions.remote_asset_graph import RemoteAssetCheckNode
from dagster._core.events import DagsterEventType
from dagster._core.storage.asset_check_execution_record import (
Expand All @@ -17,7 +20,9 @@
)

from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.auto_materialize_policy import GrapheneAutoMaterializePolicy
from dagster_graphql.schema.automation_condition import GrapheneAutomationCondition
from dagster_graphql.schema.entity_key import GrapheneAssetKey
from dagster_graphql.schema.errors import GrapheneError
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.util import ResolveInfo, non_null_list
Expand Down Expand Up @@ -134,6 +139,7 @@ class GrapheneAssetCheck(graphene.ObjectType):
canExecuteIndividually = graphene.NonNull(GrapheneAssetCheckCanExecuteIndividually)
blocking = graphene.NonNull(graphene.Boolean)
additionalAssetKeys = non_null_list(GrapheneAssetKey)
automationCondition = graphene.Field(GrapheneAutomationCondition)

class Meta:
name = "AssetCheck"
Expand Down Expand Up @@ -185,6 +191,22 @@ def resolve_additionalAssetKeys(self, _) -> Sequence[GrapheneAssetKey]:
for asset_key in self._asset_check.additional_asset_keys
]

def resolve_automationCondition(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneAutoMaterializePolicy]:
automation_condition = (
self._asset_check.automation_condition_snapshot
or self._asset_check.automation_condition
)
if automation_condition:
return GrapheneAutomationCondition(
# we only store one of automation_condition or automation_condition_snapshot
automation_condition
if isinstance(automation_condition, AutomationConditionSnapshot)
else automation_condition.get_snapshot()
)
return None


class GrapheneAssetChecks(graphene.ObjectType):
checks = non_null_list(GrapheneAssetCheck)
Expand Down Expand Up @@ -234,14 +256,3 @@ class Meta:
GrapheneAssetCheckNeedsAgentUpgradeError,
)
name = "AssetChecksOrError"


class GrapheneAssetCheckHandle(graphene.ObjectType):
name = graphene.NonNull(graphene.String)
assetKey = graphene.NonNull(GrapheneAssetKey)

class Meta:
name = "AssetCheckhandle"

def __init__(self, handle: AssetCheckKey):
super().__init__(name=handle.name, assetKey=GrapheneAssetKey(path=handle.asset_key.path))
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

import graphene
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionEvaluation,
AutomationConditionSnapshot,
)
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
)
from dagster_graphql.schema.entity_key import GrapheneAssetKey, GrapheneEntityKey
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.util import ResolveInfo, non_null_list

Expand Down Expand Up @@ -251,7 +252,8 @@ class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType):
runIds = non_null_list(graphene.String)
timestamp = graphene.NonNull(graphene.Float)

assetKey = graphene.NonNull(GrapheneAssetKey)
assetKey = graphene.Field(GrapheneAssetKey)
entityKey = graphene.NonNull(GrapheneEntityKey)
numRequested = graphene.NonNull(graphene.Int)

startTimestamp = graphene.Field(graphene.Float)
Expand Down Expand Up @@ -282,7 +284,10 @@ def __init__(
evaluationId=record.evaluation_id,
timestamp=record.timestamp,
runIds=evaluation_with_run_ids.run_ids,
assetKey=GrapheneAssetKey(path=record.key.path),
assetKey=GrapheneEntityKey.from_entity_key(record.key)
if isinstance(record.key, AssetKey)
else None,
entityKey=GrapheneEntityKey.from_entity_key(record.key),
numRequested=root_evaluation.true_subset.size,
startTimestamp=root_evaluation.start_timestamp,
endTimestamp=root_evaluation.end_timestamp,
Expand Down
Loading

0 comments on commit 76f99d1

Please sign in to comment.