Skip to content

Commit

Permalink
AssetGraphView, AssetSlice, and TemporalContext
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 8, 2024
1 parent 0e7f112 commit 7ffcdd1
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 0 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, Optional

from typing_extensions import TypeAlias

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.instance import DagsterInstance
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


class TemporalContext(NamedTuple):
"""TemporalContext represents an effective time, used for business logic, and last_event_id
which is used to identify that state of the event log at some point in time. Put another way,
the value of a TemporalContext represents a point in time and a snapshot of the event log.
Effective time: This is the effective time of the computation in terms of business logic,
and it impacts the behavior of partioning and partition mapping. For example, if an
if you get the "last" partition window of a given partitions definition, it is with
respect to the effective time.
Last event id: Our event log has a monotonic increasing event id. This is used to
cursor the event log. This event_id is also propogated to derived tables to indicate.
This allows us to query the state of the event log at a given point in time.
Note that insertion time of the last_event_id is not the same as the effective time.
A last_event_id of None indicates that the reads will be volatile will immediately
reflect any subsequent writes.
"""

effective_dt: datetime
last_event_id: Optional[int]


class _AssetSliceCompatibleSubset(ValidAssetSubset): ...


PartitionKey: TypeAlias = Optional[str]
AssetPartition: TypeAlias = AssetKeyPartitionKey


def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice":
valid_subset = subset.as_valid(
asset_graph_view.asset_graph.get_partitions_def(subset.asset_key)
)
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(*valid_subset))


class AssetSlice:
"""An asset slice represents a set of partitions for a given asset key. It is
tied to a particular instance of an AssetGraphView, and is read-only.
With an asset slice you are able to traverse the set of partitions resident
in an asset graph at any point in time.
```python
asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date())
some_asset_slice = asset_graph_view_to.get_asset_slice(some_asset.key)
for parent_slice in some_asset_slice.get_parent_slices().values():
# do something with the parent slice
```
AssetSlice is read-only and tied to a specific AssetGraphView. Therefore
we can aggressively cached methods and properties. However different methods
have different performance characterics so we have the following conventions:
Naming conventions
* Properties guaranteed to be fast.
* Methods prefixed with `get_` do some work in-memory but not hugely expensive.
* Methods prefixed with `compute_` do potentially expensive work, like compute
* partition mappings and query the instance.
* Methods using "materialize" indicate that they fully materialize partition sets
These can potentially be very expensive if the underlying partition set has
an in-memory representation that involves large time windows. I.e. if
the underlying PartitionsSubset in the ValidAssetSubset is a TimeWindowPartitionsSubset
Usage of these methods should be avoided if possible if you are potentially
dealing with slices with large time-based partition windows.
"""

def __init__(
self, asset_graph_view: "AssetGraphView", compatible_subset: _AssetSliceCompatibleSubset
):
self._asset_graph_view = asset_graph_view
self._compatible_subset = compatible_subset

def convert_to_valid_asset_subset(self) -> ValidAssetSubset:
return self._compatible_subset

def compute_partition_keys(self) -> AbstractSet[PartitionKey]:
return {ap.partition_key for ap in self._compatible_subset.asset_partitions}

@property
def asset_key(self) -> AssetKey:
return self._compatible_subset.asset_key

@property
def parent_keys(self) -> AbstractSet[AssetKey]:
return self._asset_graph_view.asset_graph.get_parents(self.asset_key)

@property
def _partitions_def(self) -> Optional["PartitionsDefinition"]:
return self._asset_graph_view.asset_graph.get_partitions_def(self.asset_key)

@cached_method
def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice":
return self._asset_graph_view.compute_parent_asset_slice(parent_asset_key, self)

@cached_method
def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice":
return self._asset_graph_view.compute_child_asset_slice(child_asset_key, self)

@cached_method
def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {
parent_asset_key: self.compute_parent_slice(parent_asset_key)
for parent_asset_key in self._asset_graph_view.asset_graph.get_parents(self.asset_key)
}

@cached_method
def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {
child_asset_key: self.compute_child_slice(child_asset_key)
for child_asset_key in self._asset_graph_view.asset_graph.get_children(self.asset_key)
}

def only_partition_keys(self, partition_keys: AbstractSet[PartitionKey]) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
return _slice_from_subset(
self._asset_graph_view,
AssetSubset.from_asset_partitions_set(
self.asset_key,
self._partitions_def,
{AssetPartition(self.asset_key, partition_key) for partition_key in partition_keys},
)
& self._compatible_subset,
)


class AssetGraphView:
"""The Asset Graph View. It is a view of the asset graph from the perspective of a specific
temporal context.
If the user wants to get a new view of the asset graph with a new effective date or last event
id, they should create a new instance of an AssetGraphView. If they do not they will get
incorrect results because the AssetGraphView and its associated classes (like AssetSlice)
cache results based on the effective date and last event id.
```python
# in a test case
asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date())
#
# call materialize on an asset in defs
#
# must create a new AssetGraphView to get the correct results,
# asset_graph_view_t1 will not reflect the new materialization
asset_graph_view_t1 = AssetGraphView.for_test(defs, effective_dt=some_date())
```
"""

@staticmethod
def for_test(
defs: "Definitions",
instance: Optional["DagsterInstance"] = None,
effective_dt: Optional[datetime] = None,
last_event_id: Optional[int] = None,
):
import pendulum

from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.instance import DagsterInstance

stale_resolver = CachingStaleStatusResolver(
instance=instance or DagsterInstance.ephemeral(),
asset_graph=defs.get_asset_graph(),
)
check.invariant(stale_resolver.instance_queryer, "Ensure instance queryer is constructed")
return AssetGraphView(
stale_resolver=stale_resolver,
temporal_context=TemporalContext(
effective_dt=effective_dt or pendulum.now(),
last_event_id=last_event_id,
),
)

def __init__(
self,
*,
temporal_context: TemporalContext,
stale_resolver: "CachingStaleStatusResolver",
):
# Current these properties have the ability to be lazily constructed.
# We instead are going to try to retain the invariant that they are
# constructed upfront so that initialization time is well-understood
# and deterministic. If there are cheap operations that do not
# require these instances, we should move them to a different abstraction.

# ensure it is already constructed rather than created on demand
check.invariant(stale_resolver._instance_queryer) # noqa: SLF001
# ensure it is already constructed rather than created on demand
check.invariant(stale_resolver._asset_graph) # noqa: SLF001

self._stale_resolver = stale_resolver
# stale resolve has a CachingInstanceQueryer which has a DagsterInstance
# so just passing the CachingStaleStatusResolver is enough
self._temporal_context = temporal_context

@property
def effective_dt(self) -> datetime:
return self._temporal_context.effective_dt

@property
def last_event_id(self) -> Optional[int]:
return self._temporal_context.last_event_id

@property
def asset_graph(self) -> "BaseAssetGraph":
return self._stale_resolver.asset_graph

@property
def _queryer(self) -> "CachingInstanceQueryer":
return self._stale_resolver.instance_queryer

def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]:
return self.asset_graph.get_partitions_def(asset_key)

def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
# not compute_asset_slice because dynamic partitions store
# is just passed to AssetSubset.all, not invoked
return _slice_from_subset(
self,
AssetSubset.all(
asset_key=asset_key,
partitions_def=self._get_partitions_def(asset_key),
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
),
)

def compute_parent_asset_slice(
self, parent_asset_key: AssetKey, asset_slice: AssetSlice
) -> AssetSlice:
return _slice_from_subset(
self,
self.asset_graph.get_parent_asset_subset(
dynamic_partitions_store=self._queryer,
parent_asset_key=parent_asset_key,
child_asset_subset=asset_slice.convert_to_valid_asset_subset(),
current_time=self.effective_dt,
),
)

def compute_child_asset_slice(
self, child_asset_key: "AssetKey", asset_slice: AssetSlice
) -> "AssetSlice":
return _slice_from_subset(
self,
self.asset_graph.get_child_asset_subset(
dynamic_partitions_store=self._queryer,
child_asset_key=child_asset_key,
current_time=self.effective_dt,
parent_asset_subset=asset_slice.convert_to_valid_asset_subset(),
),
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pendulum
from dagster import AssetDep, Definitions, asset
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import StaticPartitionMapping
from dagster._core.instance import DagsterInstance


def test_basic_construction_and_identity() -> None:
@asset
def an_asset() -> None: ...

defs = Definitions([an_asset])
instance = DagsterInstance.ephemeral()
effective_dt = pendulum.datetime(2020, 1, 1)
last_event_id = 928348343
asset_graph_view_t0 = AssetGraphView.for_test(defs, instance, effective_dt, last_event_id)

assert asset_graph_view_t0
assert asset_graph_view_t0.effective_dt == effective_dt
assert asset_graph_view_t0.last_event_id == last_event_id

# hiding stale resolver deliberately but want to test instance object identity
assert asset_graph_view_t0._stale_resolver.instance_queryer.instance is instance # noqa: SLF001

assert asset_graph_view_t0.asset_graph.all_asset_keys == {an_asset.key}


def test_slice_traversal_static_partitions() -> None:
number_keys = {"1", "2", "3"}
letter_keys = {"a", "b", "c"}
number_static_partitions_def = StaticPartitionsDefinition(list(number_keys))
letter_static_partitions_def = StaticPartitionsDefinition(list(letter_keys))
mapping = StaticPartitionMapping({"1": "a", "2": "b", "3": "c"})

@asset(partitions_def=number_static_partitions_def)
def up_numbers() -> None: ...

@asset(
deps=[AssetDep(up_numbers, partition_mapping=mapping)],
partitions_def=letter_static_partitions_def,
)
def down_letters() -> None: ...

defs = Definitions([up_numbers, down_letters])
instance = DagsterInstance.ephemeral()

asset_graph_view_t0 = AssetGraphView.for_test(defs, instance)
assert (
asset_graph_view_t0.get_asset_slice(up_numbers.key).compute_partition_keys() == number_keys
)
assert (
asset_graph_view_t0.get_asset_slice(down_letters.key).compute_partition_keys()
== letter_keys
)

# from full up to down
up_slice = asset_graph_view_t0.get_asset_slice(up_numbers.key)
assert up_slice.compute_partition_keys() == {"1", "2", "3"}
assert up_slice.compute_child_slice(down_letters.key).compute_partition_keys() == {
"a",
"b",
"c",
}

# from full up to down
down_slice = asset_graph_view_t0.get_asset_slice(down_letters.key)
assert down_slice.compute_partition_keys() == {"a", "b", "c"}
assert down_slice.compute_parent_slice(up_numbers.key).compute_partition_keys() == {
"1",
"2",
"3",
}

# subset of up to subset of down
assert up_slice.only_partition_keys({"2"}).compute_child_slice(
down_letters.key
).compute_partition_keys() == {"b"}

# subset of down to subset of up
assert down_slice.only_partition_keys({"b"}).compute_parent_slice(
up_numbers.key
).compute_partition_keys() == {"2"}


def test_only_partition_keys() -> None:
number_keys = {"1", "2", "3"}
number_static_partitions_def = StaticPartitionsDefinition(list(number_keys))

@asset(partitions_def=number_static_partitions_def)
def up_numbers() -> None: ...

defs = Definitions([up_numbers])
instance = DagsterInstance.ephemeral()

asset_graph_view_t0 = AssetGraphView.for_test(defs, instance)

assert asset_graph_view_t0.get_asset_slice(up_numbers.key).only_partition_keys(
{"1", "2"}
).compute_partition_keys() == {"1", "2"}

assert (
asset_graph_view_t0.get_asset_slice(up_numbers.key)
.only_partition_keys({"4"})
.compute_partition_keys()
== set()
)

0 comments on commit 7ffcdd1

Please sign in to comment.