diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 58371eee203d4..f5455a5a59ca2 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional +from typing import TYPE_CHECKING, AbstractSet, Iterable, Mapping, NamedTuple, NewType, Optional from dagster import _check as check from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset @@ -43,12 +43,52 @@ class _AssetSliceCompatibleSubset(ValidAssetSubset): ... PartitionKey = NewType("PartitionKey", str) +""" +PartitionKey is a string that represents a partition key. It is a new type so +that we can track the use of strs as partition keys. + +PartitionKey should only be created via the static factory methods on AssetPartitionKey. +We want to consolidate all partition key creation so we have a leverage point to change +how they. Eventually we want to create a new singleton string partition to represent +the "all" partition key for unpartitioned assets. +""" class AssetPartitionKey(NamedTuple): + """Represents a a partition in a particular asset. Prefer usage to AssetKeyPartitionKey. + If you need to convert AssetKeyPartitionKeys to AssetPartitionKeys, use the static + factory methods. + """ + asset_key: AssetKey partition_key: Optional[PartitionKey] + @staticmethod + def partition_keys_from_asset_key_partition_keys( + asset_key_partition_keys: Iterable[AssetKeyPartitionKey], + ) -> AbstractSet["PartitionKey"]: + return { + PartitionKey(check.not_none(akpk.partition_key, "No None partition keys")) + for akpk in asset_key_partition_keys + } + + @staticmethod + def from_str_partition_keys( + asset_key: AssetKey, partition_keys: Iterable[str] + ) -> AbstractSet["AssetPartitionKey"]: + return {AssetPartitionKey(asset_key, PartitionKey(pk)) for pk in partition_keys} + + @staticmethod + def from_asset_key_partition_keys( + asset_key_partition_keys: Iterable[AssetKeyPartitionKey], + ) -> AbstractSet["AssetPartitionKey"]: + return { + AssetPartitionKey( + akpk.asset_key, PartitionKey(akpk.partition_key) if akpk.partition_key else None + ) + for akpk in asset_key_partition_keys + } + def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice": valid_subset = subset.as_valid( @@ -100,10 +140,9 @@ def convert_to_valid_asset_subset(self) -> ValidAssetSubset: return self._compatible_subset def compute_partition_keys(self) -> AbstractSet[PartitionKey]: - return { - PartitionKey(check.not_none(ap.partition_key, "Must have named partitions")) - for ap in self._compatible_subset.asset_partitions - } + return AssetPartitionKey.partition_keys_from_asset_key_partition_keys( + self._compatible_subset.asset_partitions + ) @property def asset_key(self) -> AssetKey: