From 99cd2dc95e4e3196443507b19316710c8d0d0fca Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 8 Mar 2024 09:22:46 -0500 Subject: [PATCH] rebase --- .../asset_graph_view/asset_graph_view.py | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index e8f2faa24ca69..79d2d78401ba4 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -9,7 +9,7 @@ from dagster._utils.cached_method import cached_method if TYPE_CHECKING: - from dagster._core.definitions.base_asset_graph import BaseAssetGraph + from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode from dagster._core.definitions.data_version import CachingStaleStatusResolver from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.partition import PartitionsDefinition @@ -50,7 +50,7 @@ class _AssetSliceCompatibleSubset(ValidAssetSubset): ... def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice": valid_subset = subset.as_valid( - asset_graph_view.asset_graph.get_partitions_def(subset.asset_key) + asset_graph_view.asset_graph.get(subset.asset_key).partitions_def ) return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(*valid_subset)) @@ -106,11 +106,15 @@ def asset_key(self) -> AssetKey: @property def parent_keys(self) -> AbstractSet[AssetKey]: - return self._asset_graph_view.asset_graph.get_parents(self.asset_key) + return self._asset_graph_view.asset_graph.get(self.asset_key).parent_keys + + @property + def child_keys(self) -> AbstractSet[AssetKey]: + return self._asset_graph_view.asset_graph.get(self.asset_key).child_keys @property def _partitions_def(self) -> Optional["PartitionsDefinition"]: - return self._asset_graph_view.asset_graph.get_partitions_def(self.asset_key) + return self._asset_graph_view.asset_graph.get(self.asset_key).partitions_def @cached_method def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice": @@ -122,17 +126,11 @@ def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice": @cached_method def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]: - return { - parent_asset_key: self.compute_parent_slice(parent_asset_key) - for parent_asset_key in self._asset_graph_view.asset_graph.get_parents(self.asset_key) - } + return {ak: self.compute_parent_slice(ak) for ak in self.parent_keys} @cached_method def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]: - return { - child_asset_key: self.compute_child_slice(child_asset_key) - for child_asset_key in self._asset_graph_view.asset_graph.get_children(self.asset_key) - } + return {ak: self.compute_child_slice(ak) for ak in self.child_keys} def only_partition_keys(self, partition_keys: AbstractSet[PartitionKey]) -> "AssetSlice": """Return a new AssetSlice with only the given partition keys if they are in the slice.""" @@ -226,7 +224,7 @@ def last_event_id(self) -> Optional[int]: return self._temporal_context.last_event_id @property - def asset_graph(self) -> "BaseAssetGraph": + def asset_graph(self) -> "BaseAssetGraph[BaseAssetNode]": return self._stale_resolver.asset_graph @property @@ -234,7 +232,7 @@ def _queryer(self) -> "CachingInstanceQueryer": return self._stale_resolver.instance_queryer def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]: - return self.asset_graph.get_partitions_def(asset_key) + return self.asset_graph.get(asset_key).partitions_def def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice": # not compute_asset_slice because dynamic partitions store