Skip to content

Commit

Permalink
Implementation of AssetGraphView:compute_since_cron is complete
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 12, 2024
1 parent e4dfc01 commit d4c988c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
)
from dagster._utils.cached_method import cached_method

from .cron import MissedTicksEvaluationData, get_missed_ticks, get_new_asset_partitions_to_request

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.data_version import CachingStaleStatusResolver
Expand Down Expand Up @@ -227,6 +229,27 @@ def time_windows(self) -> Sequence[TimeWindow]:

check.failed(f"Unsupported partitions_def: {self._partitions_def}")

def _from_unpartitioned_ak_pk_set(
self, ak_pks: AbstractSet[AssetKeyPartitionKey]
) -> "AssetSlice":
check.invariant(
len(ak_pks) <= 1, "Cannot have more than one partition key for unpartitioned assets"
)
if not ak_pks:
return self._asset_graph_view.create_empty_slice(self.asset_key)
ak_pk = next(iter(ak_pks))
check.invariant(
ak_pk.asset_key == self.asset_key, "AssetKeyPartitionKey must match asset key"
)
check.invariant(
ak_pk.partition_key is None, "Partition key must be None for unpartitioned assets"
)
return _slice_from_subset(
self._asset_graph_view,
self._compatible_subset
& AssetSubset.from_asset_partitions_set(self.asset_key, self._partitions_def, {ak_pk}),
)

def only_asset_partitions(
self, asset_partitions: AbstractSet[AssetKeyPartitionKey]
) -> "AssetSlice":
Expand Down Expand Up @@ -411,7 +434,7 @@ def compute_intersection_with_partition_keys(
self,
asset_slice.convert_to_valid_asset_subset()
& AssetSubset.from_partition_keys(
asset_slice.asset_key, partitions_def, partition_keys
asset_slice.asset_key, partition_keys, partitions_def
),
)

Expand Down Expand Up @@ -545,6 +568,33 @@ def compute_sync_statues(
for ak_pk in asset_slice.compute_asset_partitions()
}

def compute_since_cron(
self,
asset_slice: "AssetSlice",
cron_schedule: str,
cron_timezone: str,
previous_dt: Optional[datetime],
) -> "AssetSlice":
missed_ticks = get_missed_ticks(
MissedTicksEvaluationData(
cron_schedule=cron_schedule,
cron_timezone=cron_timezone,
start_dt=previous_dt,
end_dt=self.effective_dt,
)
)

ak_pks = get_new_asset_partitions_to_request(
missed_ticks=missed_ticks,
asset_key=asset_slice.asset_key,
partitions_def=self._get_partitions_def(asset_slice.asset_key),
dynamic_partitions_store=self._queryer,
all_partitions=False,
end_dt=self.effective_dt,
)

return asset_slice.only_asset_partitions(ak_pks)


def _required_tw_partitions_def(
partitions_def: Optional["PartitionsDefinition"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,11 @@ def from_asset_partitions_set(

@staticmethod
def from_partition_keys(
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
partition_keys: AbstractSet[str],
) -> "ValidAssetSubset":
asset_key: AssetKey, partition_keys: AbstractSet[str], partitions_def: PartitionsDefinition
):
return ValidAssetSubset(
asset_key=asset_key, value=partitions_def.subset_with_partition_keys(partition_keys)
asset_key=asset_key,
value=partitions_def.subset_with_partition_keys(partition_keys=partition_keys),
)

def __contains__(self, item: AssetKeyPartitionKey) -> bool:
Expand Down

0 comments on commit d4c988c

Please sign in to comment.