Skip to content

Commit

Permalink
Make key iterator property on all asset objects
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Dec 18, 2024
1 parent fe234d6 commit 3553a63
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
def kinds(self) -> Set[str]:
return {tag[len(KIND_PREFIX) :] for tag in self.tags if tag.startswith(KIND_PREFIX)}

@property
def key_iterator(self) -> Iterable[AssetKey]:
return [self.key]

@public
def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
"""Returns a copy of this AssetSpec with an extra metadata value that dictates which I/O
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,10 @@ def keys(self) -> AbstractSet[AssetKey]:
else:
return self._specs_by_key.keys()

@property
def key_iterator(self) -> Iterable[AssetKey]:
return self.keys

@property
def has_keys(self) -> bool:
return len(self.keys) > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
KeyScopedAssetObjects,
LoadableAssetTypes,
find_objects_in_module_of_types,
key_iterator,
replace_keys_in_asset,
)
from dagster._core.definitions.source_asset import SourceAsset
Expand Down Expand Up @@ -77,12 +76,14 @@ def module_name_by_id(self) -> Dict[int, str]:
}

@cached_property
def objects_by_key(self) -> Mapping[AssetKey, Sequence[Union[SourceAsset, AssetsDefinition]]]:
def objects_by_key(
self,
) -> Mapping[AssetKey, Sequence[Union[SourceAsset, AssetSpec, AssetsDefinition]]]:
objects_by_key = defaultdict(list)
for asset_object in self.flat_object_list:
if not isinstance(asset_object, KeyScopedAssetObjects):
continue
for key in key_iterator(asset_object):
for key in asset_object.key_iterator:
objects_by_key[key].append(asset_object)
return objects_by_key

Expand Down Expand Up @@ -163,7 +164,7 @@ def assets_with_loadable_prefix(
all_asset_keys = {
key
for asset_object in self.assets_defs_specs_and_checks_defs
for key in key_iterator(asset_object, included_targeted_keys=True)
for key in asset_object.key_iterator
}
all_check_keys = {
check_key for asset_object in self.assets_defs for check_key in asset_object.check_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,6 @@ def find_subclasses_in_module(
yield value


def key_iterator(
asset: Union[AssetsDefinition, SourceAsset, AssetSpec], included_targeted_keys: bool = False
) -> Iterator[AssetKey]:
return (
iter(
[
*asset.keys,
*(
[check_key.asset_key for check_key in asset.check_keys]
if included_targeted_keys
else []
),
]
)
if isinstance(asset, AssetsDefinition)
else iter([asset.key])
)


def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]:
yield package_module
if package_module.__file__:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Any,
Callable,
Dict,
Iterable,
Iterator,
Mapping,
Optional,
Expand Down Expand Up @@ -510,3 +511,7 @@ def __eq__(self, other: object) -> bool:
and self.resource_defs == other.resource_defs
and self.observe_fn == other.observe_fn
)

@property
def key_iterator(self) -> Iterable[AssetKey]:
return [self.key]

0 comments on commit 3553a63

Please sign in to comment.