diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/__init__.py b/python_modules/dagster/dagster/_core/asset_graph_view/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py new file mode 100644 index 0000000000000..cba83faadd225 --- /dev/null +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -0,0 +1,297 @@ +from datetime import datetime +from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional + +from dagster import _check as check +from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset +from dagster._core.definitions.events import AssetKey +from dagster._utils.cached_method import cached_method + +if TYPE_CHECKING: + from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode + from dagster._core.definitions.data_version import CachingStaleStatusResolver + from dagster._core.definitions.definitions_class import Definitions + from dagster._core.definitions.partition import PartitionsDefinition + from dagster._core.instance import DagsterInstance + from dagster._utils.caching_instance_queryer import CachingInstanceQueryer + + +class TemporalContext(NamedTuple): + """TemporalContext represents an effective time, used for business logic, and last_event_id + which is used to identify that state of the event log at some point in time. Put another way, + the value of a TemporalContext represents a point in time and a snapshot of the event log. + + Effective time: This is the effective time of the computation in terms of business logic, + and it impacts the behavior of partitioning and partition mapping. For example, + the "last" partition window of a given partitions definition, it is with + respect to the effective time. + + Last event id: Our event log has a monotonically increasing event id. This is used to + cursor the event log. This event_id is also propogated to derived tables to indicate + when that record is valid. This allows us to query the state of the event log + at a given point in time. + + Note that insertion time of the last_event_id is not the same as the effective time. + + A last_event_id of None indicates that the reads will be volatile will immediately + reflect any subsequent writes. + """ + + effective_dt: datetime + last_event_id: Optional[int] + + +# We reserve the right to constraints on the AssetSubset that we are going to use +# in AssetSlice internals. Adding a NewType enforces that we do that conversion +# in one spot (see _slice_from_subset) +_AssetSliceCompatibleSubset = NewType("_AssetSliceCompatibleSubset", ValidAssetSubset) + + +def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice": + valid_subset = subset.as_valid( + asset_graph_view.asset_graph.get(subset.asset_key).partitions_def + ) + return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(valid_subset)) + + +class AssetSlice: + """An asset slice represents a set of partitions for a given asset key. It is + tied to a particular instance of an AssetGraphView, and is read-only. + + With an asset slice you are able to traverse the set of partitions resident + in an asset graph at any point in time. + + ```python + asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date()) + + some_asset_slice = asset_graph_view_to.get_asset_slice(some_asset.key) + + for parent_slice in some_asset_slice.compute_parent_slices().values(): + # do something with the parent slice + ``` + + AssetSlice is read-only and tied to a specific AssetGraphView. Therefore + we can safely use cached methods and properties at will. However different methods + have different performance characterics so we have the following conventions: + + Naming conventions + * Properties are guaranteed to be fast. + * Methods prefixed with `get_` do some work in-memory but are not hugely expensive. + * Methods prefixed with `compute_` do potentially expensive work, like compute + partition mappings and query the instance. + + We also use this prefix to indicate that they fully materialize partition sets + These can potentially be very expensive if the underlying partition set has + an in-memory representation that involves large time windows. I.e. if the + underlying PartitionsSubset in the ValidAssetSubset is a + TimeWindowPartitionsSubset. Usage of these methods should be avoided if + possible if you are potentially dealing with slices with large time-based + partition windows. + """ + + def __init__( + self, asset_graph_view: "AssetGraphView", compatible_subset: _AssetSliceCompatibleSubset + ): + self._asset_graph_view = asset_graph_view + self._compatible_subset = compatible_subset + + def convert_to_valid_asset_subset(self) -> ValidAssetSubset: + return self._compatible_subset + + # only works for partitioned assets for now + def compute_partition_keys(self) -> AbstractSet[str]: + return { + check.not_none(akpk.partition_key, "No None partition keys") + for akpk in self._compatible_subset.asset_partitions + } + + @property + def asset_key(self) -> AssetKey: + return self._compatible_subset.asset_key + + @property + def parent_keys(self) -> AbstractSet[AssetKey]: + return self._asset_graph_view.asset_graph.get(self.asset_key).parent_keys + + @property + def child_keys(self) -> AbstractSet[AssetKey]: + return self._asset_graph_view.asset_graph.get(self.asset_key).child_keys + + @property + def _partitions_def(self) -> Optional["PartitionsDefinition"]: + return self._asset_graph_view.asset_graph.get(self.asset_key).partitions_def + + @cached_method + def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice": + return self._asset_graph_view.compute_parent_asset_slice(parent_asset_key, self) + + @cached_method + def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice": + return self._asset_graph_view.compute_child_asset_slice(child_asset_key, self) + + @cached_method + def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]: + return {ak: self.compute_parent_slice(ak) for ak in self.parent_keys} + + @cached_method + def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]: + return {ak: self.compute_child_slice(ak) for ak in self.child_keys} + + def compute_intersection_with_partition_keys( + self, partition_keys: AbstractSet[str] + ) -> "AssetSlice": + """Return a new AssetSlice with only the given partition keys if they are in the slice.""" + return self._asset_graph_view.compute_intersection_with_partition_keys(partition_keys, self) + + +class AssetGraphView: + """The Asset Graph View. It is a view of the asset graph from the perspective of a specific + temporal context. + + If the user wants to get a new view of the asset graph with a new effective date or last event + id, they should create a new instance of an AssetGraphView. If they do not they will get + incorrect results because the AssetGraphView and its associated classes (like AssetSlice) + cache results based on the effective date and last event id. + + ```python + # in a test case + asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date()) + + # + # call materialize on an asset in defs + # + # must create a new AssetGraphView to get the correct results, + # asset_graph_view_t1 will not reflect the new materialization + asset_graph_view_t1 = AssetGraphView.for_test(defs, effective_dt=some_date()) + ``` + + """ + + @staticmethod + def for_test( + defs: "Definitions", + instance: Optional["DagsterInstance"] = None, + effective_dt: Optional[datetime] = None, + last_event_id: Optional[int] = None, + ): + import pendulum + + from dagster._core.definitions.data_version import CachingStaleStatusResolver + from dagster._core.instance import DagsterInstance + + stale_resolver = CachingStaleStatusResolver( + instance=instance or DagsterInstance.ephemeral(), + asset_graph=defs.get_asset_graph(), + ) + check.invariant(stale_resolver.instance_queryer, "Ensure instance queryer is constructed") + return AssetGraphView( + stale_resolver=stale_resolver, + temporal_context=TemporalContext( + effective_dt=effective_dt or pendulum.now(), + last_event_id=last_event_id, + ), + ) + + def __init__( + self, + *, + temporal_context: TemporalContext, + stale_resolver: "CachingStaleStatusResolver", + ): + # Current these properties have the ability to be lazily constructed. + # We instead are going to try to retain the invariant that they are + # constructed upfront so that initialization time is well-understood + # and deterministic. If there are cheap operations that do not + # require these instances, we should move them to a different abstraction. + + # ensure it is already constructed rather than created on demand + check.invariant(stale_resolver._instance_queryer) # noqa: SLF001 + # ensure it is already constructed rather than created on demand + check.invariant(stale_resolver._asset_graph) # noqa: SLF001 + + # stale resolver has a CachingInstanceQueryer which has a DagsterInstance + # so just passing the CachingStaleStatusResolver is enough + self._stale_resolver = stale_resolver + self._temporal_context = temporal_context + + @property + def effective_dt(self) -> datetime: + return self._temporal_context.effective_dt + + @property + def last_event_id(self) -> Optional[int]: + return self._temporal_context.last_event_id + + @property + def asset_graph(self) -> "BaseAssetGraph[BaseAssetNode]": + return self._stale_resolver.asset_graph + + @property + def _queryer(self) -> "CachingInstanceQueryer": + return self._stale_resolver.instance_queryer + + def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]: + return self.asset_graph.get(asset_key).partitions_def + + def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice": + # not compute_asset_slice because dynamic partitions store + # is just passed to AssetSubset.all, not invoked + return _slice_from_subset( + self, + AssetSubset.all( + asset_key=asset_key, + partitions_def=self._get_partitions_def(asset_key), + dynamic_partitions_store=self._queryer, + current_time=self.effective_dt, + ), + ) + + def compute_parent_asset_slice( + self, parent_asset_key: AssetKey, asset_slice: AssetSlice + ) -> AssetSlice: + return _slice_from_subset( + self, + self.asset_graph.get_parent_asset_subset( + dynamic_partitions_store=self._queryer, + parent_asset_key=parent_asset_key, + child_asset_subset=asset_slice.convert_to_valid_asset_subset(), + current_time=self.effective_dt, + ), + ) + + def compute_child_asset_slice( + self, child_asset_key: "AssetKey", asset_slice: AssetSlice + ) -> "AssetSlice": + return _slice_from_subset( + self, + self.asset_graph.get_child_asset_subset( + dynamic_partitions_store=self._queryer, + child_asset_key=child_asset_key, + current_time=self.effective_dt, + parent_asset_subset=asset_slice.convert_to_valid_asset_subset(), + ), + ) + + def compute_intersection_with_partition_keys( + self, partition_keys: AbstractSet[str], asset_slice: AssetSlice + ) -> "AssetSlice": + """Return a new AssetSlice with only the given partition keys if they are in the slice.""" + partitions_def = check.not_none( + self._get_partitions_def(asset_slice.asset_key), "Must have partitions def" + ) + for partition_key in partition_keys: + if not partitions_def.has_partition_key( + partition_key, + current_time=self.effective_dt, + dynamic_partitions_store=self._queryer, + ): + check.failed( + f"Partition key {partition_key} not in partitions def {partitions_def}" + ) + + return _slice_from_subset( + self, + asset_slice.convert_to_valid_asset_subset() + & AssetSubset.from_partition_keys( + asset_slice.asset_key, partitions_def, partition_keys + ), + ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_subset.py index 6f9b2e919dff7..1a8f0efe54f8d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_subset.py @@ -147,19 +147,27 @@ def from_asset_partitions_set( partitions_def: Optional[PartitionsDefinition], asset_partitions_set: AbstractSet[AssetKeyPartitionKey], ) -> "ValidAssetSubset": - if partitions_def is None: - return ValidAssetSubset(asset_key=asset_key, value=bool(asset_partitions_set)) - else: - return ValidAssetSubset( + return ( + ValidAssetSubset.from_partition_keys( asset_key=asset_key, - value=partitions_def.subset_with_partition_keys( - { - ap.partition_key - for ap in asset_partitions_set - if ap.partition_key is not None - } - ), + partitions_def=partitions_def, + partition_keys={ + ap.partition_key for ap in asset_partitions_set if ap.partition_key is not None + }, ) + if partitions_def + else ValidAssetSubset(asset_key=asset_key, value=bool(asset_partitions_set)) + ) + + @staticmethod + def from_partition_keys( + asset_key: AssetKey, + partitions_def: PartitionsDefinition, + partition_keys: AbstractSet[str], + ) -> "ValidAssetSubset": + return ValidAssetSubset( + asset_key=asset_key, value=partitions_def.subset_with_partition_keys(partition_keys) + ) def __contains__(self, item: AssetKeyPartitionKey) -> bool: if not self.is_partitioned: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/__init__.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py new file mode 100644 index 0000000000000..d91cf77eb532d --- /dev/null +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py @@ -0,0 +1,104 @@ +import pendulum +from dagster import AssetDep, Definitions, asset +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.partition_mapping import StaticPartitionMapping +from dagster._core.instance import DagsterInstance + + +def test_basic_construction_and_identity() -> None: + @asset + def an_asset() -> None: ... + + defs = Definitions([an_asset]) + instance = DagsterInstance.ephemeral() + effective_dt = pendulum.datetime(2020, 1, 1) + last_event_id = 928348343 + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance, effective_dt, last_event_id) + + assert asset_graph_view_t0 + assert asset_graph_view_t0.effective_dt == effective_dt + assert asset_graph_view_t0.last_event_id == last_event_id + + # hiding stale resolver deliberately but want to test instance object identity + assert asset_graph_view_t0._stale_resolver.instance_queryer.instance is instance # noqa: SLF001 + + assert asset_graph_view_t0.asset_graph.all_asset_keys == {an_asset.key} + + +def test_slice_traversal_static_partitions() -> None: + number_keys = {"1", "2", "3"} + letter_keys = {"a", "b", "c"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + letter_static_partitions_def = StaticPartitionsDefinition(list(letter_keys)) + mapping = StaticPartitionMapping({"1": "a", "2": "b", "3": "c"}) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + @asset( + deps=[AssetDep(up_numbers, partition_mapping=mapping)], + partitions_def=letter_static_partitions_def, + ) + def down_letters() -> None: ... + + defs = Definitions([up_numbers, down_letters]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance) + assert ( + asset_graph_view_t0.get_asset_slice(up_numbers.key).compute_partition_keys() == number_keys + ) + assert ( + asset_graph_view_t0.get_asset_slice(down_letters.key).compute_partition_keys() + == letter_keys + ) + + # from full up to down + up_slice = asset_graph_view_t0.get_asset_slice(up_numbers.key) + assert up_slice.compute_partition_keys() == {"1", "2", "3"} + assert up_slice.compute_child_slice(down_letters.key).compute_partition_keys() == { + "a", + "b", + "c", + } + + # from full up to down + down_slice = asset_graph_view_t0.get_asset_slice(down_letters.key) + assert down_slice.compute_partition_keys() == {"a", "b", "c"} + assert down_slice.compute_parent_slice(up_numbers.key).compute_partition_keys() == { + "1", + "2", + "3", + } + + # subset of up to subset of down + assert up_slice.compute_intersection_with_partition_keys({"2"}).compute_child_slice( + down_letters.key + ).compute_partition_keys() == {"b"} + + # subset of down to subset of up + assert down_slice.compute_intersection_with_partition_keys({"b"}).compute_parent_slice( + up_numbers.key + ).compute_partition_keys() == {"2"} + + +def test_only_partition_keys() -> None: + number_keys = {"1", "2", "3"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + defs = Definitions([up_numbers]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance) + + assert asset_graph_view_t0.get_asset_slice( + up_numbers.key + ).compute_intersection_with_partition_keys({"1", "2"}).compute_partition_keys() == {"1", "2"} + + assert asset_graph_view_t0.get_asset_slice( + up_numbers.key + ).compute_intersection_with_partition_keys({"3"}).compute_partition_keys() == set(["3"])