From f9767411463763fc2b1c28dcf666dd7f6f96d65a Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Wed, 6 Mar 2024 09:25:43 -0500 Subject: [PATCH] AssetGraphView, AssetSlice, and TemporalContext --- .../_core/asset_graph_view/__init__.py | 0 .../asset_graph_view/asset_graph_view.py | 276 ++++++++++++++++++ .../asset_graph_view_tests/__init__.py | 0 .../test_basic_asset_graph_view.py | 107 +++++++ 4 files changed, 383 insertions(+) create mode 100644 python_modules/dagster/dagster/_core/asset_graph_view/__init__.py create mode 100644 python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py create mode 100644 python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/__init__.py create mode 100644 python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/__init__.py b/python_modules/dagster/dagster/_core/asset_graph_view/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py new file mode 100644 index 0000000000000..8278d58cad1e8 --- /dev/null +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -0,0 +1,276 @@ +from datetime import datetime +from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, Optional + +from typing_extensions import TypeAlias + +from dagster import _check as check +from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset +from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey +from dagster._utils.cached_method import cached_method + +if TYPE_CHECKING: + from dagster._core.definitions.base_asset_graph import BaseAssetGraph + from dagster._core.definitions.data_version import CachingStaleStatusResolver + from dagster._core.definitions.definitions_class import Definitions + from dagster._core.definitions.partition import PartitionsDefinition + from dagster._core.instance import DagsterInstance + from dagster._utils.caching_instance_queryer import CachingInstanceQueryer + + +class TemporalContext(NamedTuple): + """TemporalContext represents an effective time, used for business logic, and last_event_id + which is used to identify that state of the event log at some point in time. Put another way, + the value of a TemporalContext represents a point in time and a snapshot of the event log. + + Effective time: This is the effective time of the computation in terms of business logic, + and it impacts the behavior of partioning and partition mapping. For example, if an + if you get the "last" partition window of a given partitions definition, it is with + respect to the effective time. + + Last event id: Our event log has a monotonic increasing event id. This is used to + cursor the event log. This event_id is also propogated to derived tables to indicate. + This allows us to query the state of the event log at a given point in time. + + Note that insertion time of the last_event_id is not the same as the effective time. + + A last_event_id of None indicates that the reads will be volatile will immediately + reflect any subsequent writes. + """ + + effective_dt: datetime + last_event_id: Optional[int] + + +class _AssetSliceCompatibleSubset(ValidAssetSubset): ... + + +PartitionKey: TypeAlias = Optional[str] +AssetPartition: TypeAlias = AssetKeyPartitionKey + + +def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice": + valid_subset = subset.as_valid( + asset_graph_view.asset_graph.get_partitions_def(subset.asset_key) + ) + return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(*valid_subset)) + + +class AssetSlice: + """An asset slice represents a set of partitions for a given asset key. It is + tied to a particular instance of an AssetGraphView, and is read-only. + + With an asset slice you are able to traverse the set of partitions resident + in an asset graph at any point in time. + + ```python + asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date()) + + some_asset_slice = asset_graph_view_to.get_asset_slice(some_asset.key) + + for parent_slice in some_asset_slice.get_parent_slices().values(): + # do something with the parent slice + ``` + + AssetSlice is read-only and tied to a specific AssetGraphView. Therefore + we can aggressively cached methods and properties. However different methods + have different performance characterics so we have the following conventions: + + Naming conventions + * Properties guaranteed to be fast. + * Methods prefixed with `get_` do some work in-memory but not hugely expensive. + * Methods prefixed with `compute_` do potentially expensive work, like compute + * partition mappings and query the instance. + * Methods using "materialize" indicate that they fully materialize partition sets + These can potentially be very expensive if the underlying partition set has + an in-memory representation that involves large time windows. I.e. if + the underlying PartitionsSubset in the ValidAssetSubset is a TimeWindowPartitionsSubset + Usage of these methods should be avoided if possible if you are potentially + dealing with slices with large time-based partition windows. + """ + + def __init__( + self, asset_graph_view: "AssetGraphView", compatible_subset: _AssetSliceCompatibleSubset + ): + self._asset_graph_view = asset_graph_view + self._compatible_subset = compatible_subset + + def convert_to_valid_asset_subset(self) -> ValidAssetSubset: + return self._compatible_subset + + def compute_partition_keys(self) -> AbstractSet[PartitionKey]: + return {ap.partition_key for ap in self._compatible_subset.asset_partitions} + + @property + def asset_key(self) -> AssetKey: + return self._compatible_subset.asset_key + + @property + def parent_keys(self) -> AbstractSet[AssetKey]: + return self._asset_graph_view.asset_graph.get_parents(self.asset_key) + + @property + def _partitions_def(self) -> Optional["PartitionsDefinition"]: + return self._asset_graph_view.asset_graph.get_partitions_def(self.asset_key) + + @cached_method + def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice": + return self._asset_graph_view.compute_parent_asset_slice(parent_asset_key, self) + + @cached_method + def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice": + return self._asset_graph_view.compute_child_asset_slice(child_asset_key, self) + + @cached_method + def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]: + return { + parent_asset_key: self.compute_parent_slice(parent_asset_key) + for parent_asset_key in self._asset_graph_view.asset_graph.get_parents(self.asset_key) + } + + @cached_method + def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]: + return { + child_asset_key: self.compute_child_slice(child_asset_key) + for child_asset_key in self._asset_graph_view.asset_graph.get_children(self.asset_key) + } + + def only_partition_keys(self, partition_keys: AbstractSet[PartitionKey]) -> "AssetSlice": + """Return a new AssetSlice with only the given partition keys if they are in the slice.""" + return _slice_from_subset( + self._asset_graph_view, + AssetSubset.from_asset_partitions_set( + self.asset_key, + self._partitions_def, + {AssetPartition(self.asset_key, partition_key) for partition_key in partition_keys}, + ) + & self._compatible_subset, + ) + + +class AssetGraphView: + """The Asset Graph View. It is a view of the asset graph from the perspective of a specific + temporal context. + + If the user wants to get a new view of the asset graph with a new effective date or last event + id, they should create a new instance of an AssetGraphView. If they do not they will get + incorrect results because the AssetGraphView and its associated classes (like AssetSlice) + cache results based on the effective date and last event id. + + ```python + # in a test case + asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date()) + + # + # call materialize on an asset in defs + # + # must create a new AssetGraphView to get the correct results, + # asset_graph_view_t1 will not reflect the new materialization + asset_graph_view_t1 = AssetGraphView.for_test(defs, effective_dt=some_date()) + ``` + + """ + + @staticmethod + def for_test( + defs: "Definitions", + instance: Optional["DagsterInstance"] = None, + effective_dt: Optional[datetime] = None, + last_event_id: Optional[int] = None, + ): + import pendulum + + from dagster._core.definitions.data_version import CachingStaleStatusResolver + from dagster._core.instance import DagsterInstance + + stale_resolver = CachingStaleStatusResolver( + instance=instance or DagsterInstance.ephemeral(), + asset_graph=defs.get_asset_graph(), + ) + check.invariant(stale_resolver.instance_queryer, "Ensure instance queryer is constructed") + return AssetGraphView( + stale_resolver=stale_resolver, + temporal_context=TemporalContext( + effective_dt=effective_dt or pendulum.now(), + last_event_id=last_event_id, + ), + ) + + def __init__( + self, + *, + temporal_context: TemporalContext, + stale_resolver: "CachingStaleStatusResolver", + ): + # Current these properties have the ability to be lazily constructed. + # We instead are going to try to retain the invariant that they are + # constructed upfront so that initialization time is well-understood + # and deterministic. If there are cheap operations that do not + # require these instances, we should move them to a different abstraction. + + # ensure it is already constructed rather than created on demand + check.invariant(stale_resolver._instance_queryer) # noqa: SLF001 + # ensure it is already constructed rather than created on demand + check.invariant(stale_resolver._asset_graph) # noqa: SLF001 + + self._stale_resolver = stale_resolver + # stale resolve has a CachingInstanceQueryer which has a DagsterInstance + # so just passing the CachingStaleStatusResolver is enough + self._temporal_context = temporal_context + + @property + def effective_dt(self) -> datetime: + return self._temporal_context.effective_dt + + @property + def last_event_id(self) -> Optional[int]: + return self._temporal_context.last_event_id + + @property + def asset_graph(self) -> "BaseAssetGraph": + return self._stale_resolver.asset_graph + + @property + def _queryer(self) -> "CachingInstanceQueryer": + return self._stale_resolver.instance_queryer + + def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]: + return self.asset_graph.get_partitions_def(asset_key) + + def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice": + # not compute_asset_slice because dynamic partitions store + # is just passed to AssetSubset.all, not invoked + return _slice_from_subset( + self, + AssetSubset.all( + asset_key=asset_key, + partitions_def=self._get_partitions_def(asset_key), + dynamic_partitions_store=self._queryer, + current_time=self.effective_dt, + ), + ) + + def compute_parent_asset_slice( + self, parent_asset_key: AssetKey, asset_slice: AssetSlice + ) -> AssetSlice: + return _slice_from_subset( + self, + self.asset_graph.get_parent_asset_subset( + dynamic_partitions_store=self._queryer, + parent_asset_key=parent_asset_key, + child_asset_subset=asset_slice.convert_to_valid_asset_subset(), + current_time=self.effective_dt, + ), + ) + + def compute_child_asset_slice( + self, child_asset_key: "AssetKey", asset_slice: AssetSlice + ) -> "AssetSlice": + return _slice_from_subset( + self, + self.asset_graph.get_child_asset_subset( + dynamic_partitions_store=self._queryer, + child_asset_key=child_asset_key, + current_time=self.effective_dt, + parent_asset_subset=asset_slice.convert_to_valid_asset_subset(), + ), + ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/__init__.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py new file mode 100644 index 0000000000000..5e2e6dd0e71b2 --- /dev/null +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_basic_asset_graph_view.py @@ -0,0 +1,107 @@ +import pendulum +from dagster import AssetDep, Definitions, asset +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.partition_mapping import StaticPartitionMapping +from dagster._core.instance import DagsterInstance + + +def test_basic_construction_and_identity() -> None: + @asset + def an_asset() -> None: ... + + defs = Definitions([an_asset]) + instance = DagsterInstance.ephemeral() + effective_dt = pendulum.datetime(2020, 1, 1) + last_event_id = 928348343 + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance, effective_dt, last_event_id) + + assert asset_graph_view_t0 + assert asset_graph_view_t0.effective_dt == effective_dt + assert asset_graph_view_t0.last_event_id == last_event_id + + # hiding stale resolver deliberately but want to test instance object identity + assert asset_graph_view_t0._stale_resolver.instance_queryer.instance is instance # noqa: SLF001 + + assert asset_graph_view_t0.asset_graph.all_asset_keys == {an_asset.key} + + +def test_slice_traversal_static_partitions() -> None: + number_keys = {"1", "2", "3"} + letter_keys = {"a", "b", "c"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + letter_static_partitions_def = StaticPartitionsDefinition(list(letter_keys)) + mapping = StaticPartitionMapping({"1": "a", "2": "b", "3": "c"}) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + @asset( + deps=[AssetDep(up_numbers, partition_mapping=mapping)], + partitions_def=letter_static_partitions_def, + ) + def down_letters() -> None: ... + + defs = Definitions([up_numbers, down_letters]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance) + assert ( + asset_graph_view_t0.get_asset_slice(up_numbers.key).compute_partition_keys() == number_keys + ) + assert ( + asset_graph_view_t0.get_asset_slice(down_letters.key).compute_partition_keys() + == letter_keys + ) + + # from full up to down + up_slice = asset_graph_view_t0.get_asset_slice(up_numbers.key) + assert up_slice.compute_partition_keys() == {"1", "2", "3"} + assert up_slice.compute_child_slice(down_letters.key).compute_partition_keys() == { + "a", + "b", + "c", + } + + # from full up to down + down_slice = asset_graph_view_t0.get_asset_slice(down_letters.key) + assert down_slice.compute_partition_keys() == {"a", "b", "c"} + assert down_slice.compute_parent_slice(up_numbers.key).compute_partition_keys() == { + "1", + "2", + "3", + } + + # subset of up to subset of down + assert up_slice.only_partition_keys({"2"}).compute_child_slice( + down_letters.key + ).compute_partition_keys() == {"b"} + + # subset of down to subset of up + assert down_slice.only_partition_keys({"b"}).compute_parent_slice( + up_numbers.key + ).compute_partition_keys() == {"2"} + + +def test_only_partition_keys() -> None: + number_keys = {"1", "2", "3"} + number_static_partitions_def = StaticPartitionsDefinition(list(number_keys)) + + @asset(partitions_def=number_static_partitions_def) + def up_numbers() -> None: ... + + defs = Definitions([up_numbers]) + instance = DagsterInstance.ephemeral() + + asset_graph_view_t0 = AssetGraphView.for_test(defs, instance) + + assert asset_graph_view_t0.get_asset_slice(up_numbers.key).only_partition_keys( + {"1", "2"} + ).compute_partition_keys() == {"1", "2"} + + assert ( + asset_graph_view_t0.get_asset_slice(up_numbers.key) + .only_partition_keys({"4"}) + .compute_partition_keys() + == set() + )