Skip to content

Commit

Permalink
Add latest_time_window
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 8, 2024
1 parent 99cd2dc commit fb2c810
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from datetime import datetime
from functools import cached_property
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, Optional

from typing_extensions import TypeAlias

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
TimeWindowPartitionsSubset,
)
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,6 +151,73 @@ def only_partition_keys(self, partition_keys: AbstractSet[PartitionKey]) -> "Ass
& self._compatible_subset,
)

@cached_property
def latest_time_window_slice(self) -> "AssetSlice":
"""Returns the latest time window for the asset slice.
If the asset slice is time-window-partitiond (e.g. daily, hourly),
this will return a slice for latest complete time window,
relative to the current effective time.
If the asset slice is not time-windowed, this will return the same asset slice.
TODO: add language for multi-dimensional partitioning when we support it
TODO: add language for dynamic partitioning when we support it
"""
if self._partitions_def is None:
return self

if isinstance(self._partitions_def, StaticPartitionsDefinition):
return self

if isinstance(self._partitions_def, TimeWindowPartitionsDefinition):
time_window = self._partitions_def.get_last_partition_window(
self._asset_graph_view.effective_dt
)

return (
self._asset_graph_view.create_from_time_window(self.asset_key, time_window)
if time_window
else self._asset_graph_view.create_empty_slice(self.asset_key)
)

# Need to handle dynamic and multi-dimensional partitioning
check.failed(f"Unsupported partitions_def: {self._partitions_def}")

@cached_property
def latest_time_window(self) -> TimeWindow:
"""If the underlying asset is time-window partitioned, this will return the latest complete
time window relative to the effective date. For example if it is daily partitioned starting
at midnight every day. If the effective date is before the start of the partition definition, this will
return the empty time window (where both start and end are datetime.max).
If the underlying asset is unpartitioned or static partitioned and it is not empty,
this will return a time window from the beginning of time to the effective date. If
it is empty it will return the empty time window.
TODO: add language for multi-dimensional partitioning when we support it
TODO: add language for dynamic partitioning when we support it
"""
if isinstance(self._partitions_def, TimeWindowPartitionsDefinition):
tw = self._partitions_def.get_last_partition_window(self._asset_graph_view.effective_dt)
return tw if tw else TimeWindow.empty()

if self._partitions_def is None or isinstance(
self._partitions_def, StaticPartitionsDefinition
):
return (
TimeWindow.empty()
if self.is_empty
else TimeWindow(datetime.min, self._asset_graph_view.effective_dt)
)

# Need to handle dynamic and multi-dimensional partitioning
check.failed(f"Unsupported partitions_def: {self._partitions_def}")

@property
def is_empty(self) -> bool:
return self._compatible_subset.size == 0


class AssetGraphView:
"""The Asset Graph View. It is a view of the asset graph from the perspective of a specific
Expand Down Expand Up @@ -272,3 +346,29 @@ def compute_child_asset_slice(
parent_asset_subset=asset_slice.convert_to_valid_asset_subset(),
),
)

def create_from_time_window(self, asset_key: AssetKey, time_window: TimeWindow) -> AssetSlice:
partitions_def = self._get_partitions_def(asset_key)
check.inst(
partitions_def,
TimeWindowPartitionsDefinition,
"Must be a time-windowed partition definition",
)
assert isinstance(partitions_def, TimeWindowPartitionsDefinition) # appease type checker
return _slice_from_subset(
self,
AssetSubset(
asset_key=asset_key,
value=TimeWindowPartitionsSubset(
partitions_def=partitions_def,
num_partitions=None,
included_time_windows=[time_window],
),
),
)

def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice:
return _slice_from_subset(
self,
AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)),
)
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ class TimeWindow(NamedTuple):
start: PublicAttr[datetime]
end: PublicAttr[datetime]

@property
def is_empty(self) -> bool:
return self.start == self.end

@staticmethod
def empty() -> "TimeWindow":
return TimeWindow(start=datetime.max, end=datetime.max)


@whitelist_for_serdes(
field_serializers={"start": DatetimeFieldSerializer, "end": DatetimeFieldSerializer},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import pendulum
from dagster import Definitions, asset
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.instance import DagsterInstance


def test_latest_time_slice_no_end() -> None:
# starts at 2020-02-01
no_end_daily = DailyPartitionsDefinition(pendulum.datetime(2020, 2, 1))

@asset(partitions_def=no_end_daily)
def daily() -> None: ...

partition_key_list = [
"2020-02-01",
"2020-02-02",
"2020-02-03",
]

defs = Definitions([daily])
instance = DagsterInstance.ephemeral()

# effective date is 2020-2-4

asset_graph_view_on_2_4 = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 2, 4)
)

assert asset_graph_view_on_2_4.get_asset_slice(daily.key).compute_partition_keys() == set(
partition_key_list
)

assert asset_graph_view_on_2_4.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys() == {"2020-02-03"}

assert asset_graph_view_on_2_4.get_asset_slice(
daily.key
).latest_time_window.start == pendulum.datetime(2020, 2, 3)
assert asset_graph_view_on_2_4.get_asset_slice(
daily.key
).latest_time_window.end == pendulum.datetime(2020, 2, 4)

# effective date is 2020-2-5. Ensure one more date

asset_graph_view_on_2_5 = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 2, 5)
)

assert asset_graph_view_on_2_5.get_asset_slice(daily.key).compute_partition_keys() == set(
partition_key_list + ["2020-02-04"]
)

assert asset_graph_view_on_2_5.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys() == {"2020-02-04"}

# in the past

asset_graph_view_on_1_1 = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 1, 1)
)

assert asset_graph_view_on_1_1.get_asset_slice(daily.key).compute_partition_keys() == set()

assert (
asset_graph_view_on_1_1.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys()
== set()
)

assert asset_graph_view_on_1_1.get_asset_slice(daily.key).latest_time_window_slice.is_empty

# effective datetime is in the middle of 02-02, it means the latest
# complete time window is 02-01 -> 02-02, so the partition key should be 02-01

asset_graph_view_on_2_2_plus_1_min = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 2, 2, minute=1)
)
assert asset_graph_view_on_2_2_plus_1_min.get_asset_slice(
daily.key
).compute_partition_keys() == set(["2020-02-01"])
tw = asset_graph_view_on_2_2_plus_1_min.get_asset_slice(daily.key).latest_time_window

assert tw.start == pendulum.datetime(2020, 2, 1)
assert tw.end == pendulum.datetime(2020, 2, 2)


def test_latest_time_slice_with_end() -> None:
# starts at 2020-02-01
daily_partitions_def = DailyPartitionsDefinition(
start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 2, 1)
)

@asset(partitions_def=daily_partitions_def)
def daily() -> None: ...

defs = Definitions([daily])
instance = DagsterInstance.ephemeral()

asset_graph_view_before_start = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2019, 12, 31)
)
assert (
asset_graph_view_before_start.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys()
== set()
)

asset_graph_view_at_start = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 1, 1)
)
assert (
asset_graph_view_at_start.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys()
== set()
)

asset_graph_view_after_start_before_end = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 1, 3)
)
assert asset_graph_view_after_start_before_end.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys() == set(["2020-01-02"])

asset_graph_view_after_end = AssetGraphView.for_test(
defs, instance, effective_dt=pendulum.datetime(2020, 2, 5)
)
assert asset_graph_view_after_end.get_asset_slice(
daily.key
).latest_time_window_slice.compute_partition_keys() == set(["2020-01-31"])


def test_latest_time_slice_unpartitioned() -> None:
@asset
def unpartitioned() -> None: ...

defs = Definitions([unpartitioned])
instance = DagsterInstance.ephemeral()

asset_graph_view = AssetGraphView.for_test(defs, instance)
assert not asset_graph_view.get_asset_slice(unpartitioned.key).latest_time_window_slice.is_empty
assert not asset_graph_view.get_asset_slice(unpartitioned.key).latest_time_window.is_empty
assert asset_graph_view.create_empty_slice(unpartitioned.key).latest_time_window_slice.is_empty
assert asset_graph_view.create_empty_slice(unpartitioned.key).latest_time_window.is_empty


def test_latest_time_slice_static_partitioned() -> None:
number_keys = {"1", "2", "3"}
number_static_partitions_def = StaticPartitionsDefinition(list(number_keys))

@asset(partitions_def=number_static_partitions_def)
def up_numbers() -> None: ...

defs = Definitions([up_numbers])
instance = DagsterInstance.ephemeral()

asset_graph_view = AssetGraphView.for_test(defs, instance)
up_slice = asset_graph_view.get_asset_slice(up_numbers.key)
assert up_slice.latest_time_window_slice.compute_partition_keys() == number_keys
assert not up_slice.is_empty
assert asset_graph_view.create_empty_slice(up_numbers.key).latest_time_window_slice.is_empty

0 comments on commit fb2c810

Please sign in to comment.