Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 8, 2024
1 parent 78c32a9 commit 99cd2dc
Showing 1 changed file with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.partition import PartitionsDefinition
Expand Down Expand Up @@ -50,7 +50,7 @@ class _AssetSliceCompatibleSubset(ValidAssetSubset): ...

def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice":
valid_subset = subset.as_valid(
asset_graph_view.asset_graph.get_partitions_def(subset.asset_key)
asset_graph_view.asset_graph.get(subset.asset_key).partitions_def
)
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(*valid_subset))

Expand Down Expand Up @@ -106,11 +106,15 @@ def asset_key(self) -> AssetKey:

@property
def parent_keys(self) -> AbstractSet[AssetKey]:
return self._asset_graph_view.asset_graph.get_parents(self.asset_key)
return self._asset_graph_view.asset_graph.get(self.asset_key).parent_keys

@property
def child_keys(self) -> AbstractSet[AssetKey]:
return self._asset_graph_view.asset_graph.get(self.asset_key).child_keys

@property
def _partitions_def(self) -> Optional["PartitionsDefinition"]:
return self._asset_graph_view.asset_graph.get_partitions_def(self.asset_key)
return self._asset_graph_view.asset_graph.get(self.asset_key).partitions_def

@cached_method
def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice":
Expand All @@ -122,17 +126,11 @@ def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice":

@cached_method
def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {
parent_asset_key: self.compute_parent_slice(parent_asset_key)
for parent_asset_key in self._asset_graph_view.asset_graph.get_parents(self.asset_key)
}
return {ak: self.compute_parent_slice(ak) for ak in self.parent_keys}

@cached_method
def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {
child_asset_key: self.compute_child_slice(child_asset_key)
for child_asset_key in self._asset_graph_view.asset_graph.get_children(self.asset_key)
}
return {ak: self.compute_child_slice(ak) for ak in self.child_keys}

def only_partition_keys(self, partition_keys: AbstractSet[PartitionKey]) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
Expand Down Expand Up @@ -226,15 +224,15 @@ def last_event_id(self) -> Optional[int]:
return self._temporal_context.last_event_id

@property
def asset_graph(self) -> "BaseAssetGraph":
def asset_graph(self) -> "BaseAssetGraph[BaseAssetNode]":
return self._stale_resolver.asset_graph

@property
def _queryer(self) -> "CachingInstanceQueryer":
return self._stale_resolver.instance_queryer

def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]:
return self.asset_graph.get_partitions_def(asset_key)
return self.asset_graph.get(asset_key).partitions_def

def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
# not compute_asset_slice because dynamic partitions store
Expand Down

0 comments on commit 99cd2dc

Please sign in to comment.