Skip to content

Commit

Permalink
final self code review
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 11, 2024
1 parent 1f29229 commit c1c0891
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
Expand All @@ -21,13 +21,14 @@ class TemporalContext(NamedTuple):
the value of a TemporalContext represents a point in time and a snapshot of the event log.
Effective time: This is the effective time of the computation in terms of business logic,
and it impacts the behavior of partioning and partition mapping. For example, if an
if you get the "last" partition window of a given partitions definition, it is with
and it impacts the behavior of partitioning and partition mapping. For example,
the "last" partition window of a given partitions definition, it is with
respect to the effective time.
Last event id: Our event log has a monotonic increasing event id. This is used to
cursor the event log. This event_id is also propogated to derived tables to indicate.
This allows us to query the state of the event log at a given point in time.
Last event id: Our event log has a monotonically increasing event id. This is used to
cursor the event log. This event_id is also propogated to derived tables to indicate
when that record is valid. This allows us to query the state of the event log
at a given point in time.
Note that insertion time of the last_event_id is not the same as the effective time.
Expand All @@ -39,14 +40,17 @@ class TemporalContext(NamedTuple):
last_event_id: Optional[int]


class _AssetSliceCompatibleSubset(ValidAssetSubset): ...
# We reserve the right to constraints on the AssetSubset that we are going to use
# in AssetSlice internals. Adding a NewType enforces that we do that conversion
# in one spot (see _slice_from_subset)
_AssetSliceCompatibleSubset = NewType("_AssetSliceCompatibleSubset", ValidAssetSubset)


def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice":
valid_subset = subset.as_valid(
asset_graph_view.asset_graph.get(subset.asset_key).partitions_def
)
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(*valid_subset))
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(valid_subset))


class AssetSlice:
Expand All @@ -61,25 +65,25 @@ class AssetSlice:
some_asset_slice = asset_graph_view_to.get_asset_slice(some_asset.key)
for parent_slice in some_asset_slice.get_parent_slices().values():
for parent_slice in some_asset_slice.compute_parent_slices().values():
# do something with the parent slice
```
AssetSlice is read-only and tied to a specific AssetGraphView. Therefore
we can aggressively use cached methods and properties. However different methods
we can safely use cached methods and properties at will. However different methods
have different performance characterics so we have the following conventions:
Naming conventions
* Properties guaranteed to be fast.
* Methods prefixed with `get_` do some work in-memory but not hugely expensive.
* Properties are guaranteed to be fast.
* Methods prefixed with `get_` do some work in-memory but are not hugely expensive.
* Methods prefixed with `compute_` do potentially expensive work, like compute
partition mappings and query the instance.
We also use this prefix to indicate that they fully materialize partition sets
These can potentially be very expensive if the underlying partition set has
an in-memory representation that involves large time windows. I.e. if the
underlying PartitionsSubset in the ValidAssetSubset is a
TimeWindowPartitionsSubset Usage of these methods should be avoided if
TimeWindowPartitionsSubset. Usage of these methods should be avoided if
possible if you are potentially dealing with slices with large time-based
partition windows.
"""
Expand All @@ -93,7 +97,7 @@ def __init__(
def convert_to_valid_asset_subset(self) -> ValidAssetSubset:
return self._compatible_subset

# only works for partitioned assets for now
# only works for partitioned assets for now
def compute_partition_keys(self) -> AbstractSet[str]:
return {
check.not_none(akpk.partition_key, "No None partition keys")
Expand Down Expand Up @@ -132,8 +136,11 @@ def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {ak: self.compute_child_slice(ak) for ak in self.child_keys}

def only_partition_keys(self, partition_keys: AbstractSet[str]) -> "AssetSlice":
def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str]
) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
return self._asset_graph_view.compute_intersection_with_partition_keys(partition_keys, self)
partitions_def = check.not_none(self._partitions_def, "Must have partitions def")
for partition_key in partition_keys:
if not partitions_def.has_partition_key(partition_key):
Expand Down Expand Up @@ -213,9 +220,9 @@ def __init__(
# ensure it is already constructed rather than created on demand
check.invariant(stale_resolver._asset_graph) # noqa: SLF001

self._stale_resolver = stale_resolver
# stale resolve has a CachingInstanceQueryer which has a DagsterInstance
# stale resolver has a CachingInstanceQueryer which has a DagsterInstance
# so just passing the CachingStaleStatusResolver is enough
self._stale_resolver = stale_resolver
self._temporal_context = temporal_context

@property
Expand Down Expand Up @@ -275,3 +282,24 @@ def compute_child_asset_slice(
parent_asset_subset=asset_slice.convert_to_valid_asset_subset(),
),
)

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str], asset_slice: AssetSlice
) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
partitions_def = check.not_none(
self._get_partitions_def(asset_slice.asset_key), "Must have partitions def"
)
for partition_key in partition_keys:
if not partitions_def.has_partition_key(partition_key):
check.failed(
f"Partition key {partition_key} not in partitions def {partitions_def}"
)

return _slice_from_subset(
self,
asset_slice.convert_to_valid_asset_subset()
& AssetSubset.from_partition_keys(
asset_slice.asset_key, partitions_def, partition_keys
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ def down_letters() -> None: ...
}

# subset of up to subset of down
assert up_slice.only_partition_keys({"2"}).compute_child_slice(
assert up_slice.compute_intersection_with_partition_keys({"2"}).compute_child_slice(
down_letters.key
).compute_partition_keys() == {"b"}

# subset of down to subset of up
assert down_slice.only_partition_keys({"b"}).compute_parent_slice(
assert down_slice.compute_intersection_with_partition_keys({"b"}).compute_parent_slice(
up_numbers.key
).compute_partition_keys() == {"2"}

Expand All @@ -95,10 +95,10 @@ def up_numbers() -> None: ...

asset_graph_view_t0 = AssetGraphView.for_test(defs, instance)

assert asset_graph_view_t0.get_asset_slice(up_numbers.key).only_partition_keys(
{"1", "2"}
).compute_partition_keys() == {"1", "2"}
assert asset_graph_view_t0.get_asset_slice(
up_numbers.key
).compute_intersection_with_partition_keys({"1", "2"}).compute_partition_keys() == {"1", "2"}

assert asset_graph_view_t0.get_asset_slice(up_numbers.key).only_partition_keys(
{"3"}
).compute_partition_keys() == set(["3"])
assert asset_graph_view_t0.get_asset_slice(
up_numbers.key
).compute_intersection_with_partition_keys({"3"}).compute_partition_keys() == set(["3"])

0 comments on commit c1c0891

Please sign in to comment.