From 01f281af1ffb9b2c6ef1bad7d2c1b967c41efefe Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 16 Dec 2024 16:23:58 -0800 Subject: [PATCH] Genericize object list utils --- .../load_asset_checks_from_modules.py | 6 +- .../load_assets_from_modules.py | 37 +++--- .../definitions/module_loaders/object_list.py | 120 ++++++++++-------- .../_core/definitions/module_loaders/utils.py | 7 +- 4 files changed, 93 insertions(+), 77 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py index ce1ab2bd9a9ec..d4a47847eb184 100644 --- a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py @@ -9,7 +9,7 @@ check_opt_coercible_to_asset_key_prefix_param, ) from dagster._core.definitions.assets import AssetsDefinition -from dagster._core.definitions.module_loaders.object_list import LoadedAssetsList +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects from dagster._core.definitions.module_loaders.utils import find_modules_in_package @@ -34,8 +34,8 @@ def load_asset_checks_from_modules( asset_key_prefix, "asset_key_prefix" ) return ( - LoadedAssetsList.from_modules(modules) - .to_post_load() + ModuleScopedDagsterObjects.from_modules(modules) + .get_object_list() .with_attributes( key_prefix=asset_key_prefix, source_key_prefix=None, diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py index 319f329fe3735..7e092456e5238 100644 --- a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py @@ -1,7 +1,7 @@ import inspect from importlib import import_module from types import ModuleType -from typing import Iterable, Iterator, Optional, Sequence, Tuple, Type, Union +from typing import Iterable, Iterator, Optional, Sequence, Tuple, Type, Union, cast import dagster._check as check from dagster._core.definitions.asset_checks import has_only_asset_checks @@ -13,17 +13,17 @@ from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy -from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) from dagster._core.definitions.freshness_policy import FreshnessPolicy -from dagster._core.definitions.module_loaders.object_list import LoadedAssetsList +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects from dagster._core.definitions.module_loaders.utils import ( - LoadableAssetTypes, + LoadableAssetObject, + LoadableDagsterObject, + RuntimeAssetObjectTypes, find_modules_in_package, ) -from dagster._core.definitions.source_asset import SourceAsset from dagster._core.definitions.utils import resolve_automation_condition @@ -62,7 +62,7 @@ def load_assets_from_modules( backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, include_specs: bool = False, -) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: +) -> Sequence[LoadableAssetObject]: """Constructs a list of assets and source assets from the given modules. Args: @@ -86,17 +86,18 @@ def load_assets_from_modules( A list containing assets and source assets defined in the given modules. """ - def _asset_filter(asset: LoadableAssetTypes) -> bool: - if isinstance(asset, AssetsDefinition): + def _asset_filter(dagster_object: LoadableDagsterObject) -> bool: + if isinstance(dagster_object, AssetsDefinition): # We don't load asset checks with asset module loaders. - return not has_only_asset_checks(asset) - if isinstance(asset, AssetSpec): + return not has_only_asset_checks(dagster_object) + if isinstance(dagster_object, AssetSpec): return include_specs - return True + return isinstance(dagster_object, RuntimeAssetObjectTypes) - return ( - LoadedAssetsList.from_modules(modules) - .to_post_load() + return cast( + Sequence[LoadableAssetObject], + ModuleScopedDagsterObjects.from_modules(modules) + .get_object_list() .with_attributes( key_prefix=check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix"), source_key_prefix=check_opt_coercible_to_asset_key_prefix_param( @@ -113,7 +114,7 @@ def _asset_filter(asset: LoadableAssetTypes) -> bool: backfill_policy, "backfill_policy", BackfillPolicy ), ) - .get_objects(_asset_filter) + .get_objects(_asset_filter), ) @@ -127,7 +128,7 @@ def load_assets_from_current_module( backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, include_specs: bool = False, -) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: +) -> Sequence[LoadableAssetObject]: """Constructs a list of assets, source assets, and cacheable assets from the module where this function is called. @@ -180,7 +181,7 @@ def load_assets_from_package_module( backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, include_specs: bool = False, -) -> Sequence[LoadableAssetTypes]: +) -> Sequence[LoadableAssetObject]: """Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module. @@ -229,7 +230,7 @@ def load_assets_from_package_name( backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, include_specs: bool = False, -) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: +) -> Sequence[LoadableAssetObject]: """Constructs a list of assets, source assets, and cacheable assets that includes all asset definitions and source assets in all sub-modules of the given package. diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py index 6a773ca8fb5ba..97872a2a725ed 100644 --- a/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py @@ -14,8 +14,9 @@ ) from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.module_loaders.utils import ( - KeyScopedAssetObjects, - LoadableAssetTypes, + LoadableDagsterObject, + RuntimeAssetObjectTypes, + RuntimeKeyScopedAssetObjectTypes, find_objects_in_module_of_types, replace_keys_in_asset, ) @@ -23,16 +24,16 @@ from dagster._core.errors import DagsterInvalidDefinitionError -class LoadedAssetsList: +class ModuleScopedDagsterObjects: def __init__( self, - assets_per_module: Mapping[str, Sequence[LoadableAssetTypes]], + objects_per_module: Mapping[str, Sequence[LoadableDagsterObject]], ): - self.assets_per_module = assets_per_module + self.objects_per_module = objects_per_module self._do_collision_detection() @classmethod - def from_modules(cls, modules: Iterable[ModuleType]) -> "LoadedAssetsList": + def from_modules(cls, modules: Iterable[ModuleType]) -> "ModuleScopedDagsterObjects": return cls( { module.__name__: list( @@ -46,17 +47,17 @@ def from_modules(cls, modules: Iterable[ModuleType]) -> "LoadedAssetsList": ) @cached_property - def flat_object_list(self) -> Sequence[LoadableAssetTypes]: + def flat_object_list(self) -> Sequence[LoadableDagsterObject]: return [ - asset_object for objects in self.assets_per_module.values() for asset_object in objects + asset_object for objects in self.objects_per_module.values() for asset_object in objects ] @cached_property - def objects_by_id(self) -> Dict[int, LoadableAssetTypes]: + def objects_by_id(self) -> Dict[int, LoadableDagsterObject]: return {id(asset_object): asset_object for asset_object in self.flat_object_list} @cached_property - def deduped_objects(self) -> Sequence[LoadableAssetTypes]: + def deduped_objects(self) -> Sequence[LoadableDagsterObject]: return list(self.objects_by_id.values()) @cached_property @@ -71,24 +72,24 @@ def source_assets(self) -> Sequence[SourceAsset]: def module_name_by_id(self) -> Dict[int, str]: return { id(asset_object): module_name - for module_name, objects in self.assets_per_module.items() + for module_name, objects in self.objects_per_module.items() for asset_object in objects } @cached_property - def objects_by_key( + def asset_objects_by_key( self, ) -> Mapping[AssetKey, Sequence[Union[SourceAsset, AssetSpec, AssetsDefinition]]]: objects_by_key = defaultdict(list) for asset_object in self.flat_object_list: - if not isinstance(asset_object, KeyScopedAssetObjects): + if not isinstance(asset_object, RuntimeKeyScopedAssetObjectTypes): continue for key in asset_object.key_iterator: objects_by_key[key].append(asset_object) return objects_by_key def _do_collision_detection(self) -> None: - for key, asset_objects in self.objects_by_key.items(): + for key, asset_objects in self.asset_objects_by_key.items(): # If there is more than one asset_object in the list for a given key, and the objects do not refer to the same asset_object in memory, we have a collision. num_distinct_objects_for_key = len( set(id(asset_object) for asset_object in asset_objects) @@ -101,14 +102,14 @@ def _do_collision_detection(self) -> None: f"Asset key {key.to_user_string()} is defined multiple times. Definitions found in modules: {asset_objects_str}." ) - def to_post_load(self) -> "ResolvedAssetObjectList": - return ResolvedAssetObjectList(self.deduped_objects) + def get_object_list(self) -> "DagsterObjectsList": + return DagsterObjectsList(self.deduped_objects) -class ResolvedAssetObjectList: +class DagsterObjectsList: def __init__( self, - loaded_objects: Sequence[LoadableAssetTypes], + loaded_objects: Sequence[LoadableDagsterObject], ): self.loaded_objects = loaded_objects @@ -149,13 +150,15 @@ def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: ] def get_objects( - self, filter_fn: Callable[[LoadableAssetTypes], bool] - ) -> Sequence[LoadableAssetTypes]: - return [asset for asset in self.loaded_objects if filter_fn(asset)] + self, filter_fn: Callable[[LoadableDagsterObject], bool] + ) -> Sequence[LoadableDagsterObject]: + return [ + dagster_object for dagster_object in self.loaded_objects if filter_fn(dagster_object) + ] def assets_with_loadable_prefix( self, key_prefix: CoercibleToAssetKeyPrefix - ) -> "ResolvedAssetObjectList": + ) -> "DagsterObjectsList": # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since # at prefix-time it is not known that its dependency is one of the cacheable assets. @@ -174,34 +177,40 @@ def assets_with_loadable_prefix( check_key_replacements = { check_key: check_key.with_asset_key_prefix(key_prefix) for check_key in all_check_keys } - for asset_object in self.loaded_objects: - if isinstance(asset_object, CacheableAssetsDefinition): - result_list.append(asset_object.with_prefix_for_all(key_prefix)) - elif isinstance(asset_object, AssetsDefinition): + for dagster_object in self.loaded_objects: + if not isinstance(dagster_object, RuntimeAssetObjectTypes): + result_list.append(dagster_object) + if isinstance(dagster_object, CacheableAssetsDefinition): + result_list.append(dagster_object.with_prefix_for_all(key_prefix)) + elif isinstance(dagster_object, AssetsDefinition): result_list.append( - replace_keys_in_asset(asset_object, key_replacements, check_key_replacements) + replace_keys_in_asset(dagster_object, key_replacements, check_key_replacements) ) else: # We don't replace the key for SourceAssets. - result_list.append(asset_object) - return ResolvedAssetObjectList(result_list) + result_list.append(dagster_object) + return DagsterObjectsList(result_list) def assets_with_source_prefix( self, key_prefix: CoercibleToAssetKeyPrefix - ) -> "ResolvedAssetObjectList": + ) -> "DagsterObjectsList": result_list = [] key_replacements = { source_asset.key: source_asset.key.with_prefix(key_prefix) for source_asset in self.source_assets } - for asset_object in self.loaded_objects: - if isinstance(asset_object, KeyScopedAssetObjects): + for dagster_object in self.loaded_objects: + if not isinstance(dagster_object, RuntimeAssetObjectTypes): + result_list.append(dagster_object) + if isinstance(dagster_object, RuntimeKeyScopedAssetObjectTypes): result_list.append( - replace_keys_in_asset(asset_object, key_replacements, check_key_replacements={}) + replace_keys_in_asset( + dagster_object, key_replacements, check_key_replacements={} + ) ) else: - result_list.append(asset_object) - return ResolvedAssetObjectList(result_list) + result_list.append(dagster_object) + return DagsterObjectsList(result_list) def with_attributes( self, @@ -211,25 +220,28 @@ def with_attributes( freshness_policy: Optional[FreshnessPolicy], automation_condition: Optional[AutomationCondition], backfill_policy: Optional[BackfillPolicy], - ) -> "ResolvedAssetObjectList": - assets_list = self.assets_with_loadable_prefix(key_prefix) if key_prefix else self - assets_list = ( - assets_list.assets_with_source_prefix(source_key_prefix) + ) -> "DagsterObjectsList": + dagster_object = self.assets_with_loadable_prefix(key_prefix) if key_prefix else self + dagster_object = ( + dagster_object.assets_with_source_prefix(source_key_prefix) if source_key_prefix - else assets_list + else dagster_object ) return_list = [] - for asset in assets_list.loaded_objects: - updated_object = asset.with_attributes( - group_name=group_name, - freshness_policy=freshness_policy, - automation_condition=automation_condition, - backfill_policy=backfill_policy, - ) - return_list.append( - updated_object.coerce_to_checks_def() - if isinstance(updated_object, AssetsDefinition) - and has_only_asset_checks(updated_object) - else updated_object - ) - return ResolvedAssetObjectList(return_list) + for dagster_object in dagster_object.loaded_objects: + if not isinstance(dagster_object, RuntimeAssetObjectTypes): + return_list.append(dagster_object) + else: + updated_object = dagster_object.with_attributes( + group_name=group_name, + freshness_policy=freshness_policy, + automation_condition=automation_condition, + backfill_policy=backfill_policy, + ) + return_list.append( + updated_object.coerce_to_checks_def() + if isinstance(updated_object, AssetsDefinition) + and has_only_asset_checks(updated_object) + else updated_object + ) + return DagsterObjectsList(return_list) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py index 97a0a26f886b1..8c2653ea0b2f8 100644 --- a/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py @@ -9,8 +9,11 @@ from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition from dagster._core.definitions.source_asset import SourceAsset -LoadableAssetTypes = Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition] -KeyScopedAssetObjects = (AssetsDefinition, AssetSpec, SourceAsset) +LoadableAssetObject = Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition] +LoadableDagsterObject = LoadableAssetObject # For now +RuntimeKeyScopedAssetObjectTypes = (AssetsDefinition, AssetSpec, SourceAsset) +RuntimeAssetObjectTypes = (AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition) +RuntimeDagsterObjectTypes = RuntimeAssetObjectTypes # For now def find_objects_in_module_of_types(