Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of AssetGraphView:compute_since_cron is complete #20392

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,41 @@
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Mapping,
NamedTuple,
NewType,
Optional,
Sequence,
cast,
)

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.data_version import StaleStatus
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionKey,
MultiPartitionsDefinition,
PartitionDimensionDefinition,
)
from dagster._core.definitions.partition import (
AllPartitionsSubset,
DefaultPartitionsSubset,
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
PartitionKeysTimeWindowPartitionsSubset,
TimeWindow,
TimeWindowPartitionsDefinition,
TimeWindowPartitionsSubset,
)
from dagster._utils.cached_method import cached_method

from .cron import MissedTicksEvaluationData, get_missed_ticks, get_new_asset_partitions_to_request

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.data_version import CachingStaleStatusResolver
Expand Down Expand Up @@ -53,6 +83,22 @@ def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset)
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(valid_subset))


class SyncStatus(Enum):
SYNCED = "SYNCED"
UNSYNCED = "UNSYNCED"

@staticmethod
def from_stale_status(stale_status: StaleStatus) -> "SyncStatus":
"""Convert a StaleStatus to a SyncStatus.

While this appears to lose information, we are redefining stale to unsynced and it is
a binary state, so this reflects that.

One will still be able to know why a partition is unsynced by looking at the causes API.
"""
return SyncStatus.SYNCED if stale_status == StaleStatus.FRESH else SyncStatus.UNSYNCED


class AssetSlice:
"""An asset slice represents a set of partitions for a given asset key. It is
tied to a particular instance of an AssetGraphView, and is read-only.
Expand Down Expand Up @@ -104,6 +150,9 @@ def compute_partition_keys(self) -> AbstractSet[str]:
for akpk in self._compatible_subset.asset_partitions
}

def compute_asset_partitions(self) -> AbstractSet[AssetKeyPartitionKey]:
return self._compatible_subset.asset_partitions

@property
def asset_key(self) -> AssetKey:
return self._compatible_subset.asset_key
Expand Down Expand Up @@ -142,6 +191,99 @@ def compute_intersection_with_partition_keys(
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
return self._asset_graph_view.compute_intersection_with_partition_keys(partition_keys, self)

@property
def time_windows(self) -> Sequence[TimeWindow]:
tw_partitions_def = self._time_window_partitions_def_in_context()
check.inst(tw_partitions_def, TimeWindowPartitionsDefinition, "Must be time windowed.")
assert isinstance(tw_partitions_def, TimeWindowPartitionsDefinition) # appease type checker
if isinstance(self._compatible_subset.subset_value, TimeWindowPartitionsSubset):
return self._compatible_subset.subset_value.included_time_windows
elif isinstance(self._compatible_subset.subset_value, AllPartitionsSubset):
last_tw = tw_partitions_def.get_last_partition_window(
self._asset_graph_view.effective_dt
)
return [TimeWindow(datetime.min, last_tw.end)] if last_tw else []
elif isinstance(self._compatible_subset.subset_value, DefaultPartitionsSubset):
check.inst(
self._partitions_def,
MultiPartitionsDefinition,
"Must be multi-partition if we got here.",
)
tw_partition_keys = set()
for multi_partition_key in self._compatible_subset.subset_value.get_partition_keys():
check.inst(multi_partition_key, MultiPartitionKey, "Must be multi partition key.")
assert isinstance(multi_partition_key, MultiPartitionKey) # appease type checker
tm_partition_key = next(iter(multi_partition_key.keys_by_dimension.values()))
tw_partition_keys.add(tm_partition_key)
subset_from_tw = tw_partitions_def.subset_with_partition_keys(tw_partition_keys)
check.inst(
subset_from_tw,
(TimeWindowPartitionsSubset, PartitionKeysTimeWindowPartitionsSubset),
"Must be time window subset.",
)
if isinstance(subset_from_tw, TimeWindowPartitionsSubset):
return subset_from_tw.included_time_windows
elif isinstance(subset_from_tw, PartitionKeysTimeWindowPartitionsSubset):
return subset_from_tw.included_time_windows
check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}")

check.failed(f"Unsupported partitions_def: {self._partitions_def}")

def _from_unpartitioned_ak_pk_set(
self, ak_pks: AbstractSet[AssetKeyPartitionKey]
) -> "AssetSlice":
check.invariant(
len(ak_pks) <= 1, "Cannot have more than one partition key for unpartitioned assets"
)
if not ak_pks:
return self._asset_graph_view.create_empty_slice(self.asset_key)
ak_pk = next(iter(ak_pks))
check.invariant(
ak_pk.asset_key == self.asset_key, "AssetKeyPartitionKey must match asset key"
)
check.invariant(
ak_pk.partition_key is None, "Partition key must be None for unpartitioned assets"
)
return _slice_from_subset(
self._asset_graph_view,
self._compatible_subset
& AssetSubset.from_asset_partitions_set(self.asset_key, self._partitions_def, {ak_pk}),
)

def only_asset_partitions(
self, asset_partitions: AbstractSet[AssetKeyPartitionKey]
) -> "AssetSlice":
return _slice_from_subset(
self._asset_graph_view,
self._compatible_subset
& AssetSubset.from_asset_partitions_set(
self.asset_key, self._partitions_def, asset_partitions
),
)

def _time_window_partitions_def_in_context(self) -> Optional[TimeWindowPartitionsDefinition]:
pd = self._partitions_def
if isinstance(pd, TimeWindowPartitionsDefinition):
return pd
if isinstance(pd, MultiPartitionsDefinition):
return pd.time_window_partitions_def if pd.has_time_window_dimension else None
return None

@property
def is_empty(self) -> bool:
return self._compatible_subset.size == 0

@cached_method
def compute_unsynced(self) -> "AssetSlice":
return self._asset_graph_view.compute_unsynced_slice(self)

@cached_method
def compute_sync_statuses(self) -> Mapping[AssetKeyPartitionKey, SyncStatus]:
return self._asset_graph_view.compute_sync_statues(self)

def __repr__(self) -> str:
return f"AssetSlice(subset={self._compatible_subset})"


class AssetGraphView:
"""The Asset Graph View. It is a view of the asset graph from the perspective of a specific
Expand Down Expand Up @@ -292,6 +434,172 @@ def compute_intersection_with_partition_keys(
self,
asset_slice.convert_to_valid_asset_subset()
& AssetSubset.from_partition_keys(
asset_slice.asset_key, partitions_def, partition_keys
asset_slice.asset_key, partition_keys, partitions_def
),
)

def create_from_time_window(self, asset_key: AssetKey, time_window: TimeWindow) -> AssetSlice:
return _slice_from_subset(
self,
AssetSubset(
asset_key=asset_key,
value=TimeWindowPartitionsSubset(
partitions_def=_required_tw_partitions_def(self._get_partitions_def(asset_key)),
num_partitions=None,
included_time_windows=[time_window],
),
),
)

def create_latest_time_window_slice(self, asset_key: AssetKey) -> AssetSlice:
"""If the underlying asset is time-window partitioned, this will return the latest complete
time window relative to the effective date. For example if it is daily partitioned starting
at midnight every day. If the effective date is before the start of the partition definition, this will
return the empty time window (where both start and end are datetime.max).

If the underlying asset is unpartitioned or static partitioned and it is not empty,
this will return a time window from the beginning of time to the effective date. If
it is empty it will return the empty time window.

TODO: add language for multi-dimensional partitioning when we support it
TODO: add language for dynamic partitioning when we support it
"""
partitions_def = self._get_partitions_def(asset_key)
if partitions_def is None or isinstance(
partitions_def, (DynamicPartitionsDefinition, StaticPartitionsDefinition)
):
return self.get_asset_slice(asset_key)

if isinstance(partitions_def, TimeWindowPartitionsDefinition):
time_window = partitions_def.get_last_partition_window(self.effective_dt)
return (
self.create_from_time_window(asset_key, time_window)
if time_window
else self.create_empty_slice(asset_key)
)

if isinstance(partitions_def, MultiPartitionsDefinition):
if not partitions_def.has_time_window_dimension:
return self.get_asset_slice(asset_key)

multi_dim_info = self._get_multi_dim_info(asset_key)
last_tw = multi_dim_info.tw_partition_def.get_last_partition_window(self.effective_dt)
return (
self._build_multi_partition_slice(asset_key, multi_dim_info, last_tw)
if last_tw
else self.create_empty_slice(asset_key)
)

# Need to handle dynamic partitioning
check.failed(f"Unsupported partitions_def: {partitions_def}")

def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice:
return _slice_from_subset(
self,
AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)),
)

class MultiDimInfo(NamedTuple):
tw_dim: PartitionDimensionDefinition
secondary_dim: PartitionDimensionDefinition

@property
def tw_partition_def(self) -> TimeWindowPartitionsDefinition:
check.inst(self.tw_dim.partitions_def, TimeWindowPartitionsDefinition)
assert isinstance(
self.tw_dim.partitions_def, TimeWindowPartitionsDefinition
) # appease pyright
return self.tw_dim.partitions_def

@property
def secondary_partition_def(self) -> "PartitionsDefinition":
return self.secondary_dim.partitions_def

def _get_multi_dim_info(self, asset_key: AssetKey) -> "MultiDimInfo":
partitions_def = self._get_partitions_def(asset_key)
check.inst(partitions_def, MultiPartitionsDefinition)
assert isinstance(partitions_def, MultiPartitionsDefinition) # appease pyright
return self.MultiDimInfo(
tw_dim=partitions_def.time_window_dimension,
secondary_dim=partitions_def.secondary_dimension,
)

def _build_multi_partition_slice(
self, asset_key: AssetKey, multi_dim_info: MultiDimInfo, last_tw: TimeWindow
) -> "AssetSlice":
# Note: Potential perf improvement here. There is no way to encode a cartesian product
# in the underlying PartitionsSet. We could add a specialized PartitionsSubset
# subclass that itself composed two PartitionsSubset to avoid materializing the entire
# partitions range.
return self.get_asset_slice(asset_key).compute_intersection_with_partition_keys(
{
MultiPartitionKey(
{
multi_dim_info.tw_dim.name: tw_pk,
multi_dim_info.secondary_dim.name: secondary_pk,
}
)
for tw_pk in multi_dim_info.tw_partition_def.get_partition_keys_in_time_window(
last_tw
)
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys(
current_time=self.effective_dt,
dynamic_partitions_store=self._queryer,
)
}
)

def compute_unsynced_slice(self, asset_slice: AssetSlice) -> "AssetSlice":
return asset_slice.only_asset_partitions(
{
ak_pk
for ak_pk, status in self.compute_sync_statues(asset_slice).items()
if status == SyncStatus.UNSYNCED
}
)

def compute_sync_statues(
self, asset_slice: "AssetSlice"
) -> Mapping[AssetKeyPartitionKey, SyncStatus]:
return {
ak_pk: SyncStatus.from_stale_status(
self._stale_resolver.get_status(asset_slice.asset_key, ak_pk.partition_key)
)
for ak_pk in asset_slice.compute_asset_partitions()
}

def compute_since_cron(
self,
asset_slice: "AssetSlice",
cron_schedule: str,
cron_timezone: str,
previous_dt: Optional[datetime],
) -> "AssetSlice":
missed_ticks = get_missed_ticks(
MissedTicksEvaluationData(
cron_schedule=cron_schedule,
cron_timezone=cron_timezone,
start_dt=previous_dt,
end_dt=self.effective_dt,
)
)

ak_pks = get_new_asset_partitions_to_request(
missed_ticks=missed_ticks,
asset_key=asset_slice.asset_key,
partitions_def=self._get_partitions_def(asset_slice.asset_key),
dynamic_partitions_store=self._queryer,
all_partitions=False,
end_dt=self.effective_dt,
)

return asset_slice.only_asset_partitions(ak_pks)


def _required_tw_partitions_def(
partitions_def: Optional["PartitionsDefinition"],
) -> TimeWindowPartitionsDefinition:
return cast(
TimeWindowPartitionsDefinition,
check.inst(partitions_def, TimeWindowPartitionsDefinition, "Must be time windowed."),
)
Loading