Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graphql endpoints for querying asset check automation history #26170

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster._core.storage.dagster_run import RunRecord

from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution

if TYPE_CHECKING:
from dagster_graphql.schema.pipelines.pipeline import GrapheneAssetCheckHandle
from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution
from dagster_graphql.schema.entity_key import GrapheneAssetCheckHandle
from dagster_graphql.schema.util import ResolveInfo


Expand All @@ -32,7 +31,9 @@ def fetch_asset_check_executions(
asset_check_key: AssetCheckKey,
limit: int,
cursor: Optional[str],
) -> List[GrapheneAssetCheckExecution]:
) -> List["GrapheneAssetCheckExecution"]:
from dagster_graphql.schema.asset_checks import GrapheneAssetCheckExecution

check_records = loading_context.instance.event_log_storage.get_asset_check_execution_history(
check_key=asset_check_key,
limit=limit,
Expand All @@ -50,7 +51,7 @@ def fetch_asset_check_executions(
def get_asset_checks_for_run_id(
graphene_info: "ResolveInfo", run_id: str
) -> Sequence["GrapheneAssetCheckHandle"]:
from dagster_graphql.schema.pipelines.pipeline import GrapheneAssetCheckHandle
from dagster_graphql.schema.entity_key import GrapheneAssetCheckHandle

check.str_param(run_id, "run_id")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional, Sequence
from typing import TYPE_CHECKING, Optional, Sequence, Union

import dagster._check as check
from dagster import AssetKey
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.schema.asset_condition_evaluations import (
Expand All @@ -13,7 +13,7 @@
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
)
from dagster_graphql.schema.inputs import GrapheneAssetKeyInput
from dagster_graphql.schema.inputs import GrapheneAssetCheckHandleInput, GrapheneAssetKeyInput

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand Down Expand Up @@ -67,20 +67,29 @@ def fetch_asset_condition_evaluation_record_for_partition(
)


def entity_key_from_graphql_input(
graphene_input: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
) -> EntityKey:
if "path" in graphene_input:
return AssetKey.from_graphql_input(graphene_input)
else:
return AssetCheckKey.from_graphql_input(graphene_input)


def fetch_true_partitions_for_evaluation_node(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_entity_key: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
evaluation_id: int,
node_unique_id: str,
) -> Sequence[str]:
asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_entity_key)
schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
record = next(
iter(
schedule_storage.get_auto_materialize_asset_evaluations(
# there is no method to get a specific evaluation by id, so instead get the first
# evaluation before evaluation_id + 1
key=asset_key,
key=key,
cursor=evaluation_id + 1,
limit=1,
)
Expand All @@ -104,7 +113,7 @@ def fetch_true_partitions_for_evaluation_node(

def fetch_asset_condition_evaluation_records_for_asset_key(
graphene_info: "ResolveInfo",
graphene_asset_key: GrapheneAssetKeyInput,
graphene_entity_key: Union[GrapheneAssetKeyInput, GrapheneAssetCheckHandleInput],
limit: int,
cursor: Optional[str],
) -> GrapheneAssetConditionEvaluationRecordsOrError:
Expand All @@ -113,13 +122,13 @@ def fetch_asset_condition_evaluation_records_for_asset_key(
if migration_error:
return migration_error

asset_key = AssetKey.from_graphql_input(graphene_asset_key)
key = entity_key_from_graphql_input(graphene_entity_key)

schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage)
return _get_graphene_records_from_evaluations(
graphene_info,
schedule_storage.get_auto_materialize_asset_evaluations(
key=asset_key,
key=key,
limit=limit,
cursor=int(cursor) if cursor else None,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@


def types():
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.backfill import (
GrapheneLaunchBackfillResult,
GrapheneLaunchBackfillSuccess,
)
from dagster_graphql.schema.config_type_or_error import GrapheneConfigTypeOrError
from dagster_graphql.schema.config_types import types as config_types
from dagster_graphql.schema.dagster_types import types as dagster_types_types
from dagster_graphql.schema.entity_key import GrapheneAssetKey
from dagster_graphql.schema.errors import types as errors_types
from dagster_graphql.schema.execution import types as execution_types
from dagster_graphql.schema.external import types as external_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
AssetCheckEvaluation,
AssetCheckEvaluationTargetMaterializationData,
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionSnapshot,
)
from dagster._core.definitions.remote_asset_graph import RemoteAssetCheckNode
from dagster._core.events import DagsterEventType
from dagster._core.storage.asset_check_execution_record import (
Expand All @@ -17,7 +20,9 @@
)

from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.auto_materialize_policy import GrapheneAutoMaterializePolicy
from dagster_graphql.schema.automation_condition import GrapheneAutomationCondition
from dagster_graphql.schema.entity_key import GrapheneAssetKey
from dagster_graphql.schema.errors import GrapheneError
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.util import ResolveInfo, non_null_list
Expand Down Expand Up @@ -134,6 +139,7 @@ class GrapheneAssetCheck(graphene.ObjectType):
canExecuteIndividually = graphene.NonNull(GrapheneAssetCheckCanExecuteIndividually)
blocking = graphene.NonNull(graphene.Boolean)
additionalAssetKeys = non_null_list(GrapheneAssetKey)
automationCondition = graphene.Field(GrapheneAutomationCondition)

class Meta:
name = "AssetCheck"
Expand Down Expand Up @@ -185,6 +191,22 @@ def resolve_additionalAssetKeys(self, _) -> Sequence[GrapheneAssetKey]:
for asset_key in self._asset_check.additional_asset_keys
]

def resolve_automationCondition(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneAutoMaterializePolicy]:
automation_condition = (
Comment on lines +194 to +197
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type annotation Optional[GrapheneAutoMaterializePolicy] doesn't match the actual return type of this method, which is Optional[GrapheneAutomationCondition]. This mismatch could cause type checking errors and make the code harder to maintain. Please update the type annotation to match the implementation.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

self._asset_check.automation_condition_snapshot
or self._asset_check.automation_condition
)
if automation_condition:
return GrapheneAutomationCondition(
# we only store one of automation_condition or automation_condition_snapshot
automation_condition
if isinstance(automation_condition, AutomationConditionSnapshot)
else automation_condition.get_snapshot()
)
return None


class GrapheneAssetChecks(graphene.ObjectType):
checks = non_null_list(GrapheneAssetCheck)
Expand Down Expand Up @@ -234,14 +256,3 @@ class Meta:
GrapheneAssetCheckNeedsAgentUpgradeError,
)
name = "AssetChecksOrError"


class GrapheneAssetCheckHandle(graphene.ObjectType):
name = graphene.NonNull(graphene.String)
assetKey = graphene.NonNull(GrapheneAssetKey)

class Meta:
name = "AssetCheckhandle"

def __init__(self, handle: AssetCheckKey):
super().__init__(name=handle.name, assetKey=GrapheneAssetKey(path=handle.asset_key.path))
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

import graphene
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionEvaluation,
AutomationConditionSnapshot,
)
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
)
from dagster_graphql.schema.entity_key import GrapheneAssetKey, GrapheneEntityKey
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.util import ResolveInfo, non_null_list

Expand Down Expand Up @@ -251,7 +252,8 @@ class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType):
runIds = non_null_list(graphene.String)
timestamp = graphene.NonNull(graphene.Float)

assetKey = graphene.NonNull(GrapheneAssetKey)
assetKey = graphene.Field(GrapheneAssetKey)
entityKey = graphene.NonNull(GrapheneEntityKey)
numRequested = graphene.NonNull(graphene.Int)

startTimestamp = graphene.Field(graphene.Float)
Expand Down Expand Up @@ -282,7 +284,10 @@ def __init__(
evaluationId=record.evaluation_id,
timestamp=record.timestamp,
runIds=evaluation_with_run_ids.run_ids,
assetKey=GrapheneAssetKey(path=record.key.path),
assetKey=GrapheneEntityKey.from_entity_key(record.key)
if isinstance(record.key, AssetKey)
else None,
entityKey=GrapheneEntityKey.from_entity_key(record.key),
numRequested=root_evaluation.true_subset.size,
startTimestamp=root_evaluation.start_timestamp,
endTimestamp=root_evaluation.end_timestamp,
Expand Down
Loading
Loading