diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py index 4f435b2a491ff..098674353a622 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py @@ -1,4 +1,4 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.definitions.materialize import materialize from docs_snippets.guides.dagster.asset_tutorial import cereal from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @@ -6,5 +6,5 @@ @patch_cereal_requests def test_cereal(): - assets, source_assets, _ = assets_from_modules([cereal]) + assets, source_assets, _ = load_assets_from_modules([cereal]) assert materialize([*assets, *source_assets]) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py index a68ed34ea861f..9c0e102292886 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py @@ -1,4 +1,4 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.definitions.materialize import materialize from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @@ -6,5 +6,5 @@ @patch_cereal_requests def test_serial_asset_graph(): - assets, source_assets, _ = assets_from_modules([serial_asset_graph]) + assets, source_assets, _ = load_assets_from_modules([serial_asset_graph]) assert materialize([*assets, *source_assets]) diff --git a/python_modules/dagster/dagster/_core/code_pointer.py b/python_modules/dagster/dagster/_core/code_pointer.py index cda304cc0f418..4601b21e23691 100644 --- a/python_modules/dagster/dagster/_core/code_pointer.py +++ b/python_modules/dagster/dagster/_core/code_pointer.py @@ -184,13 +184,13 @@ def describe(self) -> str: def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object: - from dagster._core.definitions.load_assets_from_modules import assets_from_modules + from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS if fn_name == LOAD_ALL_ASSETS: # LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular # attribute, we should load all the assets in the module. - module_assets, module_source_assets, _ = assets_from_modules([module]) + module_assets, module_source_assets, _ = load_assets_from_modules([module]) return [*module_assets, *module_source_assets] else: if not hasattr(module, fn_name): diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 12ff89d279894..b2e52fe8bf5d9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -313,6 +313,7 @@ def replace_attributes( tags: Optional[Mapping[str, str]] = ..., kinds: Optional[Set[str]] = ..., partitions_def: Optional[PartitionsDefinition] = ..., + freshness_policy: Optional[FreshnessPolicy] = ..., ) -> "AssetSpec": """Returns a new AssetSpec with the specified attributes replaced.""" current_tags_without_kinds = { diff --git a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py index 5a5d3ec99af40..a587066410f00 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py @@ -1,53 +1,20 @@ import inspect from importlib import import_module from types import ModuleType -from typing import Iterable, Optional, Sequence, Set, cast +from typing import Iterable, Optional, Sequence import dagster._check as check -from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_key import ( CoercibleToAssetKeyPrefix, check_opt_coercible_to_asset_key_prefix_param, ) -from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.load_assets_from_modules import ( + LoadedAssetsList, find_modules_in_package, - find_objects_in_module_of_types, - prefix_assets, ) -def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]: - checks = [] - ids: Set[int] = set() - for module in modules: - for c in find_objects_in_module_of_types(module, AssetsDefinition): - if has_only_asset_checks(c) and id(c) not in ids: - checks.append(cast(AssetChecksDefinition, c)) - ids.add(id(c)) - return checks - - -def _checks_with_attributes( - checks_defs: Sequence[AssetChecksDefinition], - asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[AssetChecksDefinition]: - if asset_key_prefix: - modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None) - return [ - AssetChecksDefinition.create( - keys_by_input_name=c.keys_by_input_name, - node_def=c.op, - check_specs_by_output_name=c.check_specs_by_output_name, - resource_defs=c.resource_defs, - can_subset=c.can_subset, - ) - for c in modified_checks - ] - else: - return checks_defs - - def load_asset_checks_from_modules( modules: Iterable[ModuleType], asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, @@ -68,7 +35,19 @@ def load_asset_checks_from_modules( asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix) + return ( + LoadedAssetsList.from_modules(modules) + .to_post_load() + .with_attributes( + key_prefix=asset_key_prefix, + source_key_prefix=None, + group_name=None, + freshness_policy=None, + automation_condition=None, + backfill_policy=None, + ) + .checks_defs + ) def load_asset_checks_from_current_module( @@ -95,9 +74,7 @@ def load_asset_checks_from_current_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules([module]), asset_key_prefix=asset_key_prefix - ) + return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix) def load_asset_checks_from_package_module( @@ -120,9 +97,8 @@ def load_asset_checks_from_package_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) @@ -147,7 +123,6 @@ def load_asset_checks_from_package_name( ) package_module = import_module(package_name) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) diff --git a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py index 284d76795e505..a021e47af506c 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py @@ -1,15 +1,31 @@ import inspect import pkgutil +from collections import defaultdict +from functools import cached_property from importlib import import_module from types import ModuleType -from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Type, Union, cast +from typing import ( + Callable, + Dict, + Iterable, + Iterator, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, + cast, +) import dagster._check as check +from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks from dagster._core.definitions.asset_key import ( AssetKey, CoercibleToAssetKeyPrefix, check_opt_coercible_to_asset_key_prefix_param, ) +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -19,7 +35,7 @@ ) from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.source_asset import SourceAsset -from dagster._core.definitions.utils import resolve_automation_condition +from dagster._core.definitions.utils import DEFAULT_GROUP_NAME, resolve_automation_condition from dagster._core.errors import DagsterInvalidDefinitionError @@ -47,66 +63,135 @@ def find_subclasses_in_module( yield value -def assets_from_modules( - modules: Iterable[ModuleType], extra_source_assets: Optional[Sequence[SourceAsset]] = None -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable - assets from the given modules. +LoadableAssetTypes = Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition] +KeyScopedAssetObjects = (AssetsDefinition, AssetSpec, SourceAsset) - Args: - modules (Iterable[ModuleType]): The Python modules to look for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]]: - A tuple containing a list of assets, a list of source assets, and a list of - cacheable assets defined in the given modules. - """ - asset_ids: Set[int] = set() - asset_keys: Dict[AssetKey, ModuleType] = dict() - source_assets: List[SourceAsset] = list( - check.opt_sequence_param(extra_source_assets, "extra_source_assets", of_type=SourceAsset) - ) - cacheable_assets: List[CacheableAssetsDefinition] = [] - assets: Dict[AssetKey, AssetsDefinition] = {} - for module in modules: - for asset in find_objects_in_module_of_types( - module, (AssetsDefinition, SourceAsset, CacheableAssetsDefinition) +class LoadedAssetsList: + def __init__( + self, + assets_per_module: Mapping[str, Sequence[LoadableAssetTypes]], + ): + self.assets_per_module = assets_per_module + self._do_collision_detection() + + @classmethod + def from_modules(cls, modules: Iterable[ModuleType]) -> "LoadedAssetsList": + return cls( + { + module.__name__: list( + find_objects_in_module_of_types( + module, + (AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec), + ) + ) + for module in modules + }, + ) + + @cached_property + def flat_object_list(self) -> Sequence[LoadableAssetTypes]: + return [ + asset_object for objects in self.assets_per_module.values() for asset_object in objects + ] + + @cached_property + def objects_by_id(self) -> Dict[int, LoadableAssetTypes]: + return {id(asset_object): asset_object for asset_object in self.flat_object_list} + + @cached_property + def deduped_objects(self) -> Sequence[LoadableAssetTypes]: + return list(self.objects_by_id.values()) + + @cached_property + def assets_defs(self) -> Sequence[AssetsDefinition]: + return [asset for asset in self.deduped_objects if isinstance(asset, AssetsDefinition)] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [asset for asset in self.deduped_objects if isinstance(asset, SourceAsset)] + + @cached_property + def module_name_by_id(self) -> Dict[int, str]: + return { + id(asset_object): module_name + for module_name, objects in self.assets_per_module.items() + for asset_object in objects + } + + @cached_property + def objects_by_key(self) -> Mapping[AssetKey, Sequence[Union[SourceAsset, AssetsDefinition]]]: + objects_by_key = defaultdict(list) + for asset_object in self.flat_object_list: + if not isinstance(asset_object, KeyScopedAssetObjects): + continue + for key in key_iterator(asset_object): + objects_by_key[key].append(asset_object) + return objects_by_key + + def _do_collision_detection(self) -> None: + for key, asset_objects in self.objects_by_key.items(): + # If there is more than one asset_object in the list for a given key, and the objects do not refer to the same asset_object in memory, we have a collision. + num_distinct_objects_for_key = len( + set(id(asset_object) for asset_object in asset_objects) + ) + if len(asset_objects) > 1 and num_distinct_objects_for_key > 1: + affected_modules = set( + self.module_name_by_id[id(asset_object)] for asset_object in asset_objects + ) + asset_objects_str = ", ".join(affected_modules) + modules_str = ( + f"Definitions found in modules: {asset_objects_str}." + if len(affected_modules) > 1 + else "" + ) + raise DagsterInvalidDefinitionError( + f"Asset key {key.to_user_string()} is defined multiple times. {modules_str}".strip() + ) + + def to_post_load(self) -> "ResolvedAssetObjectList": + return ResolvedAssetObjectList(self.deduped_objects) + + +def _spec_mapper_disallow_group_override( + group_name: Optional[str], + automation_condition: Optional[AutomationCondition], +) -> Callable[[AssetSpec], AssetSpec]: + def _inner(spec: AssetSpec) -> AssetSpec: + if ( + group_name is not None + and spec.group_name is not None + and group_name != spec.group_name + and spec.group_name != DEFAULT_GROUP_NAME ): - asset = cast(Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition], asset) - if id(asset) not in asset_ids: - asset_ids.add(id(asset)) - if isinstance(asset, CacheableAssetsDefinition): - cacheable_assets.append(asset) - else: - keys = asset.keys if isinstance(asset, AssetsDefinition) else [asset.key] - for key in keys: - if key in asset_keys: - modules_str = ", ".join( - set([asset_keys[key].__name__, module.__name__]) - ) - error_str = ( - f"Asset key {key} is defined multiple times. Definitions found in" - f" modules: {modules_str}. " - ) - - if key in assets and isinstance(asset, AssetsDefinition): - if assets[key].node_def == asset.node_def: - error_str += ( - "One possible cause of this bug is a call to with_resources" - " outside of a repository definition, causing a duplicate" - " asset definition." - ) - - raise DagsterInvalidDefinitionError(error_str) - else: - asset_keys[key] = module - if isinstance(asset, AssetsDefinition): - assets[key] = asset - if isinstance(asset, SourceAsset): - source_assets.append(asset) - return list(set(assets.values())), source_assets, cacheable_assets + raise DagsterInvalidDefinitionError( + f"Asset spec {spec.key.to_user_string()} has group name {spec.group_name}, which conflicts with the group name {group_name} provided in load_assets_from_modules." + ) + return spec.replace_attributes( + group_name=group_name if group_name else ..., + automation_condition=automation_condition if automation_condition else ..., + ) + + return _inner + + +def key_iterator( + asset: Union[AssetsDefinition, SourceAsset, AssetSpec], included_targeted_keys: bool = False +) -> Iterator[AssetKey]: + return ( + iter( + [ + *asset.keys, + *( + [check_key.asset_key for check_key in asset.check_keys] + if included_targeted_keys + else [] + ), + ] + ) + if isinstance(asset, AssetsDefinition) + else iter([asset.key]) + ) def load_assets_from_modules( @@ -119,7 +204,8 @@ def load_assets_from_modules( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets and source assets from the given modules. Args: @@ -142,29 +228,34 @@ def load_assets_from_modules( Sequence[Union[AssetsDefinition, SourceAsset]]: A list containing assets and source assets defined in the given modules. """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_modules(modules) - - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, - freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, + + def _asset_filter(asset: LoadableAssetTypes) -> bool: + if isinstance(asset, AssetsDefinition) and not has_only_asset_checks(asset): + return True + if isinstance(asset, AssetSpec): + return include_specs + return True + + return ( + LoadedAssetsList.from_modules(modules) + .to_post_load() + .with_attributes( + key_prefix=check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix"), + source_key_prefix=check_opt_coercible_to_asset_key_prefix_param( + source_key_prefix, "source_key_prefix" + ), + group_name=check.opt_str_param(group_name, "group_name"), + freshness_policy=check.opt_inst_param( + freshness_policy, "freshness_policy", FreshnessPolicy + ), + automation_condition=resolve_automation_condition( + automation_condition, auto_materialize_policy + ), + backfill_policy=check.opt_inst_param( + backfill_policy, "backfill_policy", BackfillPolicy + ), + ) + .get_objects(_asset_filter) ) @@ -177,7 +268,8 @@ def load_assets_from_current_module( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets from the module where this function is called. @@ -215,28 +307,7 @@ def load_assets_from_current_module( ), backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, - ) - - -def assets_from_package_module( - package_module: ModuleType, - extra_source_assets: Optional[Sequence[SourceAsset]] = None, -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable assets - from the given package module. - - Args: - package_module (ModuleType): The package module to looks for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. - - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - A tuple containing a list of assets, a list of source assets, and a list of cacheable assets - defined in the given modules. - """ - return assets_from_modules( - find_modules_in_package(package_module), extra_source_assets=extra_source_assets + include_specs=include_specs, ) @@ -250,7 +321,8 @@ def load_assets_from_package_module( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[LoadableAssetTypes]: """Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module. @@ -276,28 +348,16 @@ def load_assets_from_package_module( Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: A list containing assets, source assets, and cacheable assets defined in the module. """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_package_module(package_module) - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, + return load_assets_from_modules( + [*find_modules_in_package(package_module)], + group_name, + key_prefix, freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), + auto_materialize_policy=auto_materialize_policy, + automation_condition=automation_condition, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, + include_specs=include_specs, ) @@ -310,7 +370,8 @@ def load_assets_from_package_name( auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets that includes all asset definitions and source assets in all sub-modules of the given package. @@ -343,6 +404,7 @@ def load_assets_from_package_name( auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, + include_specs=include_specs, ) @@ -363,149 +425,170 @@ def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: ) -def prefix_assets( - assets_defs: Sequence[AssetsDefinition], - key_prefix: CoercibleToAssetKeyPrefix, - source_assets: Sequence[SourceAsset], - source_key_prefix: Optional[CoercibleToAssetKeyPrefix], -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]: - """Given a list of assets, prefix the input and output asset keys and check specs with key_prefix. - The prefix is not added to source assets. - - Input asset keys that reference other assets within assets_defs are "brought along" - - i.e. prefixed as well. - - Example with a single asset: - - .. code-block:: python - - @asset - def asset1(): - ... - - result = prefixed_asset_key_replacements([asset_1], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - - Example with dependencies within the list of assets: - - .. code-block:: python - - @asset - def asset1(): - ... - - @asset - def asset2(asset1): - ... - - result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"]) - assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])} - - """ - asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys} - check_target_keys = { - key.asset_key for assets_def in assets_defs for key in assets_def.check_keys - } - source_asset_keys = {source_asset.key for source_asset in source_assets} - - if isinstance(key_prefix, str): - key_prefix = [key_prefix] - key_prefix = check.is_list(key_prefix, of_type=str) - - if isinstance(source_key_prefix, str): - source_key_prefix = [source_key_prefix] - - result_assets: List[AssetsDefinition] = [] - for assets_def in assets_defs: - output_asset_key_replacements = { - asset_key: AssetKey([*key_prefix, *asset_key.path]) - for asset_key in ( - assets_def.keys | {check_key.asset_key for check_key in assets_def.check_keys} - ) - } - input_asset_key_replacements = {} - for dep_asset_key in assets_def.keys_by_input_name.values(): - if dep_asset_key in asset_keys or dep_asset_key in check_target_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*key_prefix, *dep_asset_key.path] - ) - elif source_key_prefix and dep_asset_key in source_asset_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*source_key_prefix, *dep_asset_key.path] - ) - - result_assets.append( - assets_def.with_attributes( - output_asset_key_replacements=output_asset_key_replacements, - input_asset_key_replacements=input_asset_key_replacements, - ) +def replace_keys_in_asset( + asset: Union[AssetsDefinition, AssetSpec, SourceAsset], + key_replacements: Mapping[AssetKey, AssetKey], +) -> Union[AssetsDefinition, AssetSpec, SourceAsset]: + if isinstance(asset, SourceAsset): + return asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) + if isinstance(asset, AssetSpec): + return asset.replace_attributes( + key=key_replacements.get(asset.key, asset.key), ) - - if source_key_prefix: - result_source_assets = [ - source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path])) - for source_asset in source_assets - ] else: - result_source_assets = source_assets - - return result_assets, result_source_assets - - -def assets_with_attributes( - assets_defs: Sequence[AssetsDefinition], - source_assets: Sequence[SourceAsset], - cacheable_assets: Sequence[CacheableAssetsDefinition], - key_prefix: Optional[Sequence[str]], - group_name: Optional[str], - freshness_policy: Optional[FreshnessPolicy], - automation_condition: Optional[AutomationCondition], - backfill_policy: Optional[BackfillPolicy], - source_key_prefix: Optional[Sequence[str]], -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, - # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since - # at prefix-time it is not known that its dependency is one of the cacheable assets. - # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 - if key_prefix: - assets_defs, source_assets = prefix_assets( - assets_defs, key_prefix, source_assets, source_key_prefix + updated_object = asset.with_attributes( + output_asset_key_replacements={ + key: key_replacements.get(key, key) + for key in key_iterator(asset, included_targeted_keys=True) + }, + input_asset_key_replacements={ + key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values() + }, ) - cacheable_assets = [ - cached_asset.with_prefix_for_all(key_prefix) for cached_asset in cacheable_assets + if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset): + updated_object = AssetChecksDefinition.create( + keys_by_input_name=updated_object.keys_by_input_name, + node_def=updated_object.op, + check_specs_by_output_name=updated_object.check_specs_by_output_name, + resource_defs=updated_object.resource_defs, + can_subset=updated_object.can_subset, + ) + return updated_object + + +class ResolvedAssetObjectList: + def __init__( + self, + loaded_objects: Sequence[LoadableAssetTypes], + ): + self.loaded_objects = loaded_objects + + @cached_property + def assets_defs_and_specs(self) -> Sequence[Union[AssetsDefinition, AssetSpec]]: + return [ + dagster_object + for dagster_object in self.loaded_objects + if (isinstance(dagster_object, AssetsDefinition) and dagster_object.keys) + or isinstance(dagster_object, AssetSpec) ] - if group_name or freshness_policy or automation_condition or backfill_policy: - assets_defs = [ - asset.with_attributes( - group_names_by_key=( - {asset_key: group_name for asset_key in asset.keys} - if group_name is not None - else {} - ), - freshness_policy=freshness_policy, - automation_condition=automation_condition, - backfill_policy=backfill_policy, - ) - for asset in assets_defs + @cached_property + def checks_defs(self) -> Sequence[AssetChecksDefinition]: + return [ + cast(AssetChecksDefinition, asset) + for asset in self.loaded_objects + if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset) ] - if group_name: - source_assets = [ - source_asset.with_attributes(group_name=group_name) - for source_asset in source_assets - ] - cacheable_assets = [ - cached_asset.with_attributes_for_all( - group_name, - freshness_policy=freshness_policy, - auto_materialize_policy=automation_condition.as_auto_materialize_policy() - if automation_condition - else None, - backfill_policy=backfill_policy, - ) - for cached_asset in cacheable_assets + + @cached_property + def assets_defs_specs_and_checks_defs( + self, + ) -> Sequence[Union[AssetsDefinition, AssetSpec, AssetChecksDefinition]]: + return [*self.assets_defs_and_specs, *self.checks_defs] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [asset for asset in self.loaded_objects if isinstance(asset, SourceAsset)] + + @cached_property + def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: + return [ + asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition) ] - return [*assets_defs, *source_assets, *cacheable_assets] + def get_objects( + self, filter_fn: Callable[[LoadableAssetTypes], bool] + ) -> Sequence[LoadableAssetTypes]: + return [asset for asset in self.loaded_objects if filter_fn(asset)] + + def assets_with_loadable_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "ResolvedAssetObjectList": + # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, + # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since + # at prefix-time it is not known that its dependency is one of the cacheable assets. + # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 + result_list = [] + all_asset_keys = { + key + for asset_object in self.assets_defs_specs_and_checks_defs + for key in key_iterator(asset_object, included_targeted_keys=True) + } + key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys} + for asset_object in self.loaded_objects: + if isinstance(asset_object, CacheableAssetsDefinition): + result_list.append(asset_object.with_prefix_for_all(key_prefix)) + elif isinstance(asset_object, AssetsDefinition): + result_list.append(replace_keys_in_asset(asset_object, key_replacements)) + else: + # We don't replace the key for SourceAssets. + result_list.append(asset_object) + return ResolvedAssetObjectList(result_list) + + def assets_with_source_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "ResolvedAssetObjectList": + result_list = [] + key_replacements = { + source_asset.key: source_asset.key.with_prefix(key_prefix) + for source_asset in self.source_assets + } + for asset_object in self.loaded_objects: + if isinstance(asset_object, KeyScopedAssetObjects): + result_list.append(replace_keys_in_asset(asset_object, key_replacements)) + else: + result_list.append(asset_object) + return ResolvedAssetObjectList(result_list) + + def with_attributes( + self, + key_prefix: Optional[CoercibleToAssetKeyPrefix], + source_key_prefix: Optional[CoercibleToAssetKeyPrefix], + group_name: Optional[str], + freshness_policy: Optional[FreshnessPolicy], + automation_condition: Optional[AutomationCondition], + backfill_policy: Optional[BackfillPolicy], + ) -> "ResolvedAssetObjectList": + assets_list = self.assets_with_loadable_prefix(key_prefix) if key_prefix else self + assets_list = ( + assets_list.assets_with_source_prefix(source_key_prefix) + if source_key_prefix + else assets_list + ) + return_list = [] + for asset in assets_list.loaded_objects: + if isinstance(asset, AssetsDefinition): + new_asset = asset.map_asset_specs( + _spec_mapper_disallow_group_override(group_name, automation_condition) + ).with_attributes( + backfill_policy=backfill_policy, freshness_policy=freshness_policy + ) + if isinstance(asset, AssetChecksDefinition): + new_asset = AssetChecksDefinition.create( + keys_by_input_name=new_asset.keys_by_input_name, + node_def=new_asset.op, + check_specs_by_output_name=new_asset.check_specs_by_output_name, + resource_defs=new_asset.resource_defs, + can_subset=new_asset.can_subset, + ) + return_list.append(new_asset) + elif isinstance(asset, SourceAsset): + return_list.append( + asset.with_attributes(group_name=group_name if group_name else asset.group_name) + ) + elif isinstance(asset, AssetSpec): + return_list.append( + _spec_mapper_disallow_group_override(group_name, automation_condition)(asset) + ) + else: + return_list.append( + asset.with_attributes_for_all( + group_name, + freshness_policy=freshness_policy, + auto_materialize_policy=automation_condition.as_auto_materialize_policy() + if automation_condition + else None, + backfill_policy=backfill_policy, + ) + ) + return ResolvedAssetObjectList(return_list) diff --git a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py index 3cf0189899cf3..7c4ce174e3a39 100644 --- a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py +++ b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py @@ -5,7 +5,7 @@ from dagster import DagsterInvariantViolationError, GraphDefinition, RepositoryDefinition from dagster._core.code_pointer import load_python_file, load_python_module from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules LOAD_ALL_ASSETS = "<>" @@ -114,7 +114,7 @@ def loadable_targets_from_loaded_module(module: ModuleType) -> Sequence[Loadable ) ) - module_assets, module_source_assets, _ = assets_from_modules([module]) + module_assets, module_source_assets, _ = load_assets_from_modules([module]) if len(module_assets) > 0 or len(module_source_assets) > 0: return [LoadableTarget(LOAD_ALL_ASSETS, [*module_assets, *module_source_assets])] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py index 9eaf0d44dad77..44830afc5c1fa 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset +from dagster._core.definitions.asset_spec import AssetSpec @asset @@ -29,4 +30,7 @@ def make_list_of_source_assets(): return [buddy_holly, jerry_lee_lewis] +top_level_spec = AssetSpec("top_level_spec") + + list_of_assets_and_source_assets = [*make_list_of_assets(), *make_list_of_source_assets()] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py index 0cea67687a7f0..eaa8819f9fc57 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset, graph_asset, op +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata import ( CodeReferencesMetadataSet, CodeReferencesMetadataValue, @@ -41,3 +42,6 @@ def multiply_by_two(input_num): @graph_asset def graph_backed_asset(): return multiply_by_two(one()) + + +my_spec = AssetSpec("my_asset_spec") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py index 7fc579483ac2e..97d9549e615da 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py @@ -14,6 +14,7 @@ load_assets_from_package_module, load_assets_from_package_name, ) +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition @@ -101,6 +102,15 @@ def test_load_assets_from_package_name(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_package_module(): from dagster_tests.asset_defs_tests import asset_package @@ -117,6 +127,15 @@ def test_load_assets_from_package_module(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_modules(monkeypatch): from dagster_tests.asset_defs_tests import asset_package @@ -141,13 +160,23 @@ def little_richard(): m.setattr(asset_package, "little_richard_dup", little_richard, raising=False) with pytest.raises( DagsterInvalidDefinitionError, - match=re.escape( - "Asset key AssetKey(['little_richard']) is defined multiple times. " - "Definitions found in modules: dagster_tests.asset_defs_tests.asset_package." - ), + match=re.escape("Asset key little_richard is defined multiple times."), ): load_assets_from_modules([asset_package, module_with_assets]) + # Create an AssetsDefinition with an identical spec to that in the module + with monkeypatch.context() as m: + + @asset + def top_level_spec(): + pass + + m.setattr(asset_package, "top_level_spec_same_assets_def", top_level_spec, raising=False) + with pytest.raises( + DagsterInvalidDefinitionError, + ): + load_assets_from_modules([asset_package, module_with_assets], include_specs=True) + @asset(group_name="my_group") def asset_in_current_module(): @@ -156,12 +185,22 @@ def asset_in_current_module(): source_asset_in_current_module = SourceAsset(AssetKey("source_asset_in_current_module")) +spec_in_current_module = AssetSpec("spec_in_current_module") + def test_load_assets_from_current_module(): assets = load_assets_from_current_module() assets = [get_unique_asset_identifier(asset) for asset in assets] - assert assets == ["asset_in_current_module", AssetKey("source_asset_in_current_module")] + assert set(assets) == {"asset_in_current_module", AssetKey("source_asset_in_current_module")} assert len(assets) == 2 + assets = load_assets_from_current_module(include_specs=True) + assets = [get_unique_asset_identifier(asset) for asset in assets] + assert len(assets) == 3 + assert set(assets) == { + "asset_in_current_module", + AssetKey("source_asset_in_current_module"), + AssetKey("spec_in_current_module"), + } def test_load_assets_from_modules_with_group_name(): @@ -179,7 +218,8 @@ def test_load_assets_from_modules_with_group_name(): def test_respect_existing_groups(): assets = load_assets_from_current_module() - assert assets[0].group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue] + assets_def = next(iter(a for a in assets if isinstance(a, AssetsDefinition))) + assert assets_def.group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue] with pytest.raises(DagsterInvalidDefinitionError): load_assets_from_current_module(group_name="yay") @@ -270,7 +310,9 @@ def test_source_key_prefix(load_fn): assert get_assets_def_with_key( assets_with_prefix_sources, AssetKey(["foo", "my_cool_prefix", "chuck_berry"]) ).dependency_keys == { + # source prefix AssetKey(["bar", "cooler_prefix", "elvis_presley"]), + # loadable prefix AssetKey(["foo", "my_cool_prefix", "miles_davis"]), }