Skip to content

Commit

Permalink
Return tree instead of recursive resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 9, 2024
1 parent 9a4e747 commit 5d9cded
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
GrapheneAssetConditionEvaluationRecord,
GrapheneAssetConditionEvaluationRecords,
GrapheneAssetConditionEvaluationRecordsOrError,
GrapheneSpecificPartitionAssetConditionEvaluation,
)
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError,
Expand Down Expand Up @@ -86,8 +85,11 @@ def fetch_asset_condition_evaluation_record_for_partition(
if asset_node and asset_node.external_asset_node.partitions_def_data
else None
)
return GrapheneSpecificPartitionAssetConditionEvaluation(
record.get_evaluation_with_run_ids(partitions_def).evaluation, partition_key
return GrapheneAssetConditionEvaluation(
record.get_evaluation_with_run_ids(partitions_def).evaluation,
partitions_def,
graphene_info.context.instance,
partition_key,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
import itertools
from typing import Optional, Sequence, Union

import graphene
Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, value: Union[bool, PartitionsSubset]):
GraphenePartitionKeyRange(start, end)
for start, end in value.get_partition_key_ranges(value.partitions_def)
]
partition_keys = value.get_partition_keys()
else:
partition_keys = value.get_partition_keys()

Expand Down Expand Up @@ -75,7 +77,8 @@ def __init__(self, asset_subset: AssetSubset):
)


class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType):
class GrapheneUnpartitionedAssetConditionEvaluationNode(graphene.ObjectType):
uniqueId = graphene.NonNull(graphene.String)
description = graphene.NonNull(graphene.String)

startTimestamp = graphene.Field(graphene.Float)
Expand All @@ -84,29 +87,30 @@ class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType):
metadataEntries = non_null_list(GrapheneMetadataEntry)
status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus)

childEvaluations = graphene.Field(
graphene.List(graphene.NonNull(lambda: GrapheneUnpartitionedAssetConditionEvaluation))
)
childUniqueIds = non_null_list(graphene.String)

class Meta:
name = "UnpartitionedAssetConditionEvaluation"
name = "UnpartitionedAssetConditionEvaluationNode"

def __init__(self, evaluation: AssetConditionEvaluation):
if evaluation.true_subset.bool_value:
status = AssetConditionEvaluationStatus.TRUE
elif evaluation.candidate_subset and evaluation.candidate_subset.bool_value:
elif (
isinstance(evaluation.candidate_subset, AssetSubset)
and evaluation.candidate_subset.bool_value
):
status = AssetConditionEvaluationStatus.FALSE
else:
status = AssetConditionEvaluationStatus.SKIPPED

super().__init__(
uniqueId=evaluation.condition_snapshot.unique_id,
description=evaluation.condition_snapshot.description,
startTimestamp=evaluation.start_timestamp,
endTimestamp=evaluation.end_timestamp,
status=status,
childEvaluations=[
GrapheneUnpartitionedAssetConditionEvaluation(child)
for child in evaluation.child_evaluations
childUniqueIds=[
child.condition_snapshot.unique_id for child in evaluation.child_evaluations
],
)

Expand All @@ -120,7 +124,8 @@ def resolve_metadataEntries(
return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()]


class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType):
class GraphenePartitionedAssetConditionEvaluationNode(graphene.ObjectType):
uniqueId = graphene.NonNull(graphene.String)
description = graphene.NonNull(graphene.String)

startTimestamp = graphene.Field(graphene.Float)
Expand All @@ -134,12 +139,10 @@ class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType):
numFalse = graphene.NonNull(graphene.Int)
numSkipped = graphene.NonNull(graphene.Int)

childEvaluations = graphene.Field(
graphene.List(graphene.NonNull(lambda: GraphenePartitionedAssetConditionEvaluation))
)
childUniqueIds = non_null_list(graphene.String)

class Meta:
name = "PartitionedAssetConditionEvaluation"
name = "PartitionedAssetConditionEvaluationNode"

def __init__(
self,
Expand All @@ -154,20 +157,22 @@ def __init__(
evaluation.asset_key, partitions_def, dynamic_partitions_store, pendulum.now("UTC")
)

# if the candidate_subset is unset, then we evaluated all partitions
self._candidate_subset = evaluation.candidate_subset or self._all_subset
# if the candidate_subset is a HistoricalAssetSubset, then we evaluated all partitions
self._candidate_subset = (
evaluation.candidate_subset
if isinstance(evaluation.candidate_subset, AssetSubset)
else self._all_subset
)

super().__init__(
uniqueId=evaluation.condition_snapshot.unique_id,
description=evaluation.condition_snapshot.description,
startTimestamp=evaluation.start_timestamp,
endTimestamp=evaluation.end_timestamp,
trueSubset=GrapheneAssetSubset(evaluation.true_subset),
candidateSubset=GrapheneAssetSubset(self._candidate_subset),
childEvaluations=[
GraphenePartitionedAssetConditionEvaluation(
child, partitions_def, dynamic_partitions_store
)
for child in evaluation.child_evaluations
childUniqueIds=[
child.condition_snapshot.unique_id for child in evaluation.child_evaluations
],
)

Expand All @@ -184,18 +189,17 @@ def resolve_numSkipped(self, graphene_info: ResolveInfo) -> int:
return self._all_subset.size - self._candidate_subset.size


class GrapheneSpecificPartitionAssetConditionEvaluation(graphene.ObjectType):
class GrapheneSpecificPartitionAssetConditionEvaluationNode(graphene.ObjectType):
uniqueId = graphene.NonNull(graphene.String)
description = graphene.NonNull(graphene.String)

metadataEntries = non_null_list(GrapheneMetadataEntry)
status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus)

childEvaluations = graphene.Field(
graphene.List(graphene.NonNull(lambda: GrapheneSpecificPartitionAssetConditionEvaluation))
)
childUniqueIds = non_null_list(graphene.String)

class Meta:
name = "SpecificPartitionAssetConditionEvaluation"
name = "SpecificPartitionAssetConditionEvaluationNode"

def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str):
self._evaluation = evaluation
Expand All @@ -204,19 +208,19 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str):
if partition_key in evaluation.true_subset.subset_value:
status = AssetConditionEvaluationStatus.TRUE
elif (
evaluation.candidate_subset is None
not isinstance(evaluation.candidate_subset, AssetSubset)
or partition_key in evaluation.candidate_subset.subset_value
):
status = AssetConditionEvaluationStatus.FALSE
else:
status = AssetConditionEvaluationStatus.SKIPPED

super().__init__(
uniqueId=evaluation.condition_snapshot.unique_id,
description=evaluation.condition_snapshot.description,
status=status,
childEvaluations=[
GrapheneSpecificPartitionAssetConditionEvaluation(child, partition_key)
for child in evaluation.child_evaluations
childUniqueIds=[
child.condition_snapshot.unique_id for child in evaluation.child_evaluations
],
)

Expand All @@ -235,15 +239,60 @@ def resolve_metadataEntries(
return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()]


class GrapheneAssetConditionEvaluation(graphene.Union):
class GrapheneAssetConditionEvaluationNode(graphene.Union):
class Meta:
types = (
GrapheneUnpartitionedAssetConditionEvaluation,
GraphenePartitionedAssetConditionEvaluation,
GrapheneSpecificPartitionAssetConditionEvaluation,
GrapheneUnpartitionedAssetConditionEvaluationNode,
GraphenePartitionedAssetConditionEvaluationNode,
GrapheneSpecificPartitionAssetConditionEvaluationNode,
)
name = "AssetConditionEvaluationNode"


class GrapheneAssetConditionEvaluation(graphene.ObjectType):
rootUniqueId = graphene.NonNull(graphene.String)
evaluationNodes = non_null_list(GrapheneAssetConditionEvaluationNode)

class Meta:
name = "AssetConditionEvaluation"

def __init__(
self,
evaluation: AssetConditionEvaluation,
partitions_def: Optional[PartitionsDefinition],
dynamic_partitions_store: DynamicPartitionsStore,
partition_key: Optional[str] = None,
):
# flatten the evaluation tree into a list of nodes
def _flatten(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]:
return list(itertools.chain([e], *(_flatten(ce) for ce in e.child_evaluations)))

all_nodes = _flatten(evaluation)

if evaluation.true_subset.is_partitioned:
if partition_key is None:
evaluationNodes = [
GraphenePartitionedAssetConditionEvaluationNode(
evaluation, partitions_def, dynamic_partitions_store
)
for evaluation in all_nodes
]
else:
evaluationNodes = [
GrapheneSpecificPartitionAssetConditionEvaluationNode(evaluation, partition_key)
for evaluation in all_nodes
]
else:
evaluationNodes = [
GrapheneUnpartitionedAssetConditionEvaluationNode(evaluation)
for evaluation in all_nodes
]

super().__init__(
rootUniqueId=evaluation.condition_snapshot.unique_id,
evaluationNodes=evaluationNodes,
)


class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType):
id = graphene.NonNull(graphene.ID)
Expand All @@ -254,6 +303,9 @@ class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType):
assetKey = graphene.NonNull(GrapheneAssetKey)
numRequested = graphene.NonNull(graphene.Int)

startTimestamp = graphene.Field(graphene.Float)
endTimestamp = graphene.Field(graphene.Float)

evaluation = graphene.NonNull(GrapheneAssetConditionEvaluation)

class Meta:
Expand All @@ -264,22 +316,8 @@ def __init__(
record: AutoMaterializeAssetEvaluationRecord,
partitions_def: Optional[PartitionsDefinition],
dynamic_partitions_store: DynamicPartitionsStore,
partition_key: Optional[str] = None,
):
evaluation_with_run_ids = record.get_evaluation_with_run_ids(partitions_def)
if evaluation_with_run_ids.evaluation.true_subset.is_partitioned:
if partition_key is None:
evaluation = GraphenePartitionedAssetConditionEvaluation(
evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store
)
else:
evaluation = GrapheneSpecificPartitionAssetConditionEvaluation(
evaluation_with_run_ids.evaluation, partition_key
)
else:
evaluation = GrapheneUnpartitionedAssetConditionEvaluation(
evaluation_with_run_ids.evaluation
)

super().__init__(
id=record.id,
Expand All @@ -288,7 +326,11 @@ def __init__(
runIds=evaluation_with_run_ids.run_ids,
assetKey=GrapheneAssetKey(path=record.asset_key.path),
numRequested=evaluation_with_run_ids.evaluation.true_subset.size,
evaluation=evaluation,
startTimestamp=evaluation_with_run_ids.evaluation.start_timestamp,
endTimestamp=evaluation_with_run_ids.evaluation.end_timestamp,
evaluation=GrapheneAssetConditionEvaluation(
evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from dagster_graphql.implementation.fetch_logs import get_captured_log_metadata
from dagster_graphql.implementation.fetch_runs import get_assets_latest_info
from dagster_graphql.schema.asset_condition_evaluations import (
GrapheneAssetConditionEvaluation,
GrapheneAssetConditionEvaluationRecordsOrError,
GrapheneSpecificPartitionAssetConditionEvaluation,
)
from dagster_graphql.schema.auto_materialize_asset_evaluations import (
GrapheneAutoMaterializeAssetEvaluationRecordsOrError,
Expand Down Expand Up @@ -523,7 +523,7 @@ class Meta:
)

assetConditionEvaluationForPartition = graphene.Field(
GrapheneSpecificPartitionAssetConditionEvaluation,
GrapheneAssetConditionEvaluation,
assetKey=graphene.Argument(graphene.NonNull(GrapheneAssetKeyInput)),
evaluationId=graphene.Argument(graphene.NonNull(graphene.Int)),
partition=graphene.Argument(graphene.NonNull(graphene.String)),
Expand Down
Loading

0 comments on commit 5d9cded

Please sign in to comment.