Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 9, 2024
1 parent 12ce1f0 commit 6bb9be9
Showing 1 changed file with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional
from typing import TYPE_CHECKING, AbstractSet, Iterable, Mapping, NamedTuple, NewType, Optional

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
Expand Down Expand Up @@ -43,12 +43,52 @@ class _AssetSliceCompatibleSubset(ValidAssetSubset): ...


PartitionKey = NewType("PartitionKey", str)
"""
PartitionKey is a string that represents a partition key. It is a new type so
that we can track the use of strs as partition keys.
PartitionKey should only be created via the static factory methods on AssetPartitionKey.
We want to consolidate all partition key creation so we have a leverage point to change
how they. Eventually we want to create a new singleton string partition to represent
the "all" partition key for unpartitioned assets.
"""


class AssetPartitionKey(NamedTuple):
"""Represents a a partition in a particular asset. Prefer usage to AssetKeyPartitionKey.
If you need to convert AssetKeyPartitionKeys to AssetPartitionKeys, use the static
factory methods.
"""

asset_key: AssetKey
partition_key: Optional[PartitionKey]

@staticmethod
def partition_keys_from_asset_key_partition_keys(
asset_key_partition_keys: Iterable[AssetKeyPartitionKey],
) -> AbstractSet["PartitionKey"]:
return {
PartitionKey(check.not_none(akpk.partition_key, "No None partition keys"))
for akpk in asset_key_partition_keys
}

@staticmethod
def from_str_partition_keys(
asset_key: AssetKey, partition_keys: Iterable[str]
) -> AbstractSet["AssetPartitionKey"]:
return {AssetPartitionKey(asset_key, PartitionKey(pk)) for pk in partition_keys}

@staticmethod
def from_asset_key_partition_keys(
asset_key_partition_keys: Iterable[AssetKeyPartitionKey],
) -> AbstractSet["AssetPartitionKey"]:
return {
AssetPartitionKey(
akpk.asset_key, PartitionKey(akpk.partition_key) if akpk.partition_key else None
)
for akpk in asset_key_partition_keys
}


def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice":
valid_subset = subset.as_valid(
Expand Down Expand Up @@ -100,10 +140,9 @@ def convert_to_valid_asset_subset(self) -> ValidAssetSubset:
return self._compatible_subset

def compute_partition_keys(self) -> AbstractSet[PartitionKey]:
return {
PartitionKey(check.not_none(ap.partition_key, "Must have named partitions"))
for ap in self._compatible_subset.asset_partitions
}
return AssetPartitionKey.partition_keys_from_asset_key_partition_keys(
self._compatible_subset.asset_partitions
)

@property
def asset_key(self) -> AssetKey:
Expand Down

0 comments on commit 6bb9be9

Please sign in to comment.