From 54dab1e91ae623ec2d241189cf810604d8c1149e Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Fri, 13 Dec 2024 16:36:52 -0800 Subject: [PATCH] Handle attribute mapping properly --- .../dagster/_core/definitions/asset_spec.py | 1 + .../definitions/load_assets_from_modules.py | 373 ++++++++++-------- .../test_assets_from_modules.py | 7 +- 3 files changed, 217 insertions(+), 164 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 12ff89d279894..b2e52fe8bf5d9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -313,6 +313,7 @@ def replace_attributes( tags: Optional[Mapping[str, str]] = ..., kinds: Optional[Set[str]] = ..., partitions_def: Optional[PartitionsDefinition] = ..., + freshness_policy: Optional[FreshnessPolicy] = ..., ) -> "AssetSpec": """Returns a new AssetSpec with the specified attributes replaced.""" current_tags_without_kinds = { diff --git a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py index a71f7d5a890fd..dd476dda9014e 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py @@ -4,7 +4,19 @@ from functools import cached_property from importlib import import_module from types import ModuleType -from typing import Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Tuple, Type, Union +from typing import ( + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) import dagster._check as check from dagster._core.definitions.asset_key import ( @@ -12,6 +24,7 @@ CoercibleToAssetKeyPrefix, check_opt_coercible_to_asset_key_prefix_param, ) +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -21,7 +34,7 @@ ) from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.source_asset import SourceAsset -from dagster._core.definitions.utils import resolve_automation_condition +from dagster._core.definitions.utils import DEFAULT_GROUP_NAME, resolve_automation_condition from dagster._core.errors import DagsterInvalidDefinitionError @@ -88,6 +101,14 @@ def objects_by_id(self) -> Dict[int, LoadableAssetTypes]: def deduped_objects(self) -> Sequence[LoadableAssetTypes]: return list(self.objects_by_id.values()) + @cached_property + def assets_defs(self) -> Sequence[AssetsDefinition]: + return [asset for asset in self.deduped_objects if isinstance(asset, AssetsDefinition)] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [asset for asset in self.deduped_objects if isinstance(asset, SourceAsset)] + @cached_property def module_name_by_id(self) -> Dict[int, str]: return { @@ -106,24 +127,6 @@ def objects_by_key(self) -> Mapping[AssetKey, Sequence[Union[SourceAsset, Assets objects_by_key[key].append(asset_object) return objects_by_key - @cached_property - def sources(self) -> Sequence[SourceAsset]: - return [asset for asset in self.deduped_objects if isinstance(asset, SourceAsset)] - - @cached_property - def assets_defs(self) -> Sequence[AssetsDefinition]: - return [ - asset - for asset in self.deduped_objects - if isinstance(asset, AssetsDefinition) and asset.keys - ] - - @cached_property - def cacheable_assets_defs(self) -> Sequence[CacheableAssetsDefinition]: - return [ - asset for asset in self.deduped_objects if isinstance(asset, CacheableAssetsDefinition) - ] - def _do_collision_detection(self) -> None: for key, asset_objects in self.objects_by_key.items(): # If there is more than one asset_object in the list for a given key, and the objects do not refer to the same asset_object in memory, we have a collision. @@ -131,36 +134,58 @@ def _do_collision_detection(self) -> None: set(id(asset_object) for asset_object in asset_objects) ) if len(asset_objects) > 1 and num_distinct_objects_for_key > 1: - asset_objects_str = ", ".join( - set(self.module_name_by_id[id(asset_object)] for asset_object in asset_objects) + affected_modules = set( + self.module_name_by_id[id(asset_object)] for asset_object in asset_objects + ) + asset_objects_str = ", ".join(affected_modules) + modules_str = ( + f"Definitions found in modules: {asset_objects_str}." + if len(affected_modules) > 1 + else "" ) raise DagsterInvalidDefinitionError( - f"Asset key {key.to_user_string()} is defined multiple times. Definitions found in modules: {asset_objects_str}." + f"Asset key {key.to_user_string()} is defined multiple times. {modules_str}".strip() ) + def to_post_load(self) -> "ResolvedAssetObjectList": + return ResolvedAssetObjectList(self.deduped_objects) -def assets_from_modules( - modules: Iterable[ModuleType], -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable - assets from the given modules. - Args: - modules (Iterable[ModuleType]): The Python modules to look for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. +def _spec_mapper_disallow_group_override( + group_name: Optional[str], + automation_condition: Optional[AutomationCondition], +) -> Callable[[AssetSpec], AssetSpec]: + def _inner(spec: AssetSpec) -> AssetSpec: + if ( + group_name is not None + and spec.group_name is not None + and group_name != spec.group_name + and spec.group_name != DEFAULT_GROUP_NAME + ): + raise DagsterInvalidDefinitionError( + f"Asset spec {spec.key.to_user_string()} has group name {spec.group_name}, which conflicts with the group name {group_name} provided in load_assets_from_modules." + ) + return spec.replace_attributes( + group_name=group_name if group_name else ..., + automation_condition=automation_condition if automation_condition else ..., + ) - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]]: - A tuple containing a list of assets, a list of source assets, and a list of - cacheable assets defined in the given modules. - """ - assets_list = LoadedAssetsList.from_modules(modules) - return assets_list.assets_defs, assets_list.sources, assets_list.cacheable_assets_defs + return _inner -def key_iterator(asset: Union[AssetsDefinition, SourceAsset]) -> Iterator[AssetKey]: - return iter(asset.keys) if isinstance(asset, AssetsDefinition) else iter([asset.key]) +def key_iterator( + asset: Union[AssetsDefinition, SourceAsset], included_targeted_keys: bool = False +) -> Iterator[AssetKey]: + return ( + iter( + *[asset.keys], + *[check_key.asset_key for check_key in asset.check_keys] + if included_targeted_keys + else [], + ) + if isinstance(asset, AssetsDefinition) + else iter([asset.key]) + ) def load_assets_from_modules( @@ -196,29 +221,26 @@ def load_assets_from_modules( Sequence[Union[AssetsDefinition, SourceAsset]]: A list containing assets and source assets defined in the given modules. """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_modules(modules) - - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, - freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, + return ( + LoadedAssetsList.from_modules(modules) + .to_post_load() + .with_attributes( + key_prefix=check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix"), + source_key_prefix=check_opt_coercible_to_asset_key_prefix_param( + source_key_prefix, "source_key_prefix" + ), + group_name=check.opt_str_param(group_name, "group_name"), + freshness_policy=check.opt_inst_param( + freshness_policy, "freshness_policy", FreshnessPolicy + ), + automation_condition=resolve_automation_condition( + automation_condition, auto_materialize_policy + ), + backfill_policy=check.opt_inst_param( + backfill_policy, "backfill_policy", BackfillPolicy + ), + ) + .loaded_objects ) @@ -272,25 +294,6 @@ def load_assets_from_current_module( ) -def assets_from_package_module( - package_module: ModuleType, -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable assets - from the given package module. - - Args: - package_module (ModuleType): The package module to looks for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. - - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - A tuple containing a list of assets, a list of source assets, and a list of cacheable assets - defined in the given modules. - """ - return assets_from_modules(find_modules_in_package(package_module)) - - def load_assets_from_package_module( package_module: ModuleType, group_name: Optional[str] = None, @@ -301,7 +304,7 @@ def load_assets_from_package_module( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: +) -> Sequence[LoadableAssetTypes]: """Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module. @@ -327,26 +330,13 @@ def load_assets_from_package_module( Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: A list containing assets, source assets, and cacheable assets defined in the module. """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_package_module(package_module) - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, + return load_assets_from_modules( + [*find_modules_in_package(package_module)], + group_name, + key_prefix, freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), + auto_materialize_policy=auto_materialize_policy, + automation_condition=automation_condition, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, ) @@ -414,6 +404,126 @@ def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: ) +def replace_keys_in_asset( + asset: Union[AssetsDefinition, SourceAsset], + key_replacements: Mapping[AssetKey, AssetKey], +) -> Union[AssetsDefinition, SourceAsset]: + return ( + asset.with_attributes( + output_asset_key_replacements={ + key: key_replacements.get(key, key) for key in asset.keys + }, + input_asset_key_replacements={ + key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values() + }, + ) + if isinstance(asset, AssetsDefinition) + else asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) + ) + + +class ResolvedAssetObjectList: + def __init__( + self, + loaded_objects: Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]], + ): + self.loaded_objects = loaded_objects + + @cached_property + def assets_defs(self) -> Sequence[AssetsDefinition]: + return [ + asset + for asset in self.loaded_objects + if isinstance(asset, AssetsDefinition) and asset.keys + ] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [asset for asset in self.loaded_objects if isinstance(asset, SourceAsset)] + + def assets_with_loadable_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "ResolvedAssetObjectList": + # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, + # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since + # at prefix-time it is not known that its dependency is one of the cacheable assets. + # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 + result_list = [] + all_asset_keys = { + key + for asset_object in self.assets_defs + for key in key_iterator(asset_object, included_targeted_keys=True) + } + key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys} + for asset_object in self.loaded_objects: + if isinstance(asset_object, CacheableAssetsDefinition): + result_list.append(asset_object.with_prefix_for_all(key_prefix)) + elif isinstance(asset_object, AssetsDefinition): + result_list.append(replace_keys_in_asset(asset_object, key_replacements)) + else: + # We don't replace the key for SourceAssets. + result_list.append(asset_object) + return ResolvedAssetObjectList(result_list) + + def assets_with_source_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "ResolvedAssetObjectList": + result_list = [] + key_replacements = { + source_asset.key: source_asset.key.with_prefix(key_prefix) + for source_asset in self.source_assets + } + for asset_object in self.loaded_objects: + if isinstance(asset_object, KeyScopedAssetObjects): + result_list.append(replace_keys_in_asset(asset_object, key_replacements)) + else: + result_list.append(asset_object) + return ResolvedAssetObjectList(result_list) + + def with_attributes( + self, + key_prefix: Optional[CoercibleToAssetKeyPrefix], + source_key_prefix: Optional[CoercibleToAssetKeyPrefix], + group_name: Optional[str], + freshness_policy: Optional[FreshnessPolicy], + automation_condition: Optional[AutomationCondition], + backfill_policy: Optional[BackfillPolicy], + ) -> "ResolvedAssetObjectList": + assets_list = self.assets_with_loadable_prefix(key_prefix) if key_prefix else self + assets_list = ( + assets_list.assets_with_source_prefix(source_key_prefix) + if source_key_prefix + else assets_list + ) + return_list = [] + for asset in assets_list.loaded_objects: + if isinstance(asset, AssetsDefinition): + return_list.append( + asset.map_asset_specs( + _spec_mapper_disallow_group_override(group_name, automation_condition) + ).with_attributes( + backfill_policy=backfill_policy, freshness_policy=freshness_policy + ) + ) + elif isinstance(asset, SourceAsset): + return_list.append( + asset.with_attributes(group_name=group_name if group_name else asset.group_name) + ) + else: + return_list.append( + asset.with_attributes_for_all( + group_name, + freshness_policy=freshness_policy, + auto_materialize_policy=automation_condition.as_auto_materialize_policy() + if automation_condition + else None, + backfill_policy=backfill_policy, + ) + ) + return ResolvedAssetObjectList(return_list) + + +# No longer used in load_assets_from_modules, next PR will fix for load_asset_checks_from_modules def prefix_assets( assets_defs: Sequence[AssetsDefinition], key_prefix: CoercibleToAssetKeyPrefix, @@ -503,60 +613,3 @@ def asset2(asset1): result_source_assets = source_assets return result_assets, result_source_assets - - -def assets_with_attributes( - assets_defs: Sequence[AssetsDefinition], - source_assets: Sequence[SourceAsset], - cacheable_assets: Sequence[CacheableAssetsDefinition], - key_prefix: Optional[Sequence[str]], - group_name: Optional[str], - freshness_policy: Optional[FreshnessPolicy], - automation_condition: Optional[AutomationCondition], - backfill_policy: Optional[BackfillPolicy], - source_key_prefix: Optional[Sequence[str]], -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, - # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since - # at prefix-time it is not known that its dependency is one of the cacheable assets. - # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 - if key_prefix: - assets_defs, source_assets = prefix_assets( - assets_defs, key_prefix, source_assets, source_key_prefix - ) - cacheable_assets = [ - cached_asset.with_prefix_for_all(key_prefix) for cached_asset in cacheable_assets - ] - - if group_name or freshness_policy or automation_condition or backfill_policy: - assets_defs = [ - asset.with_attributes( - group_names_by_key=( - {asset_key: group_name for asset_key in asset.keys} - if group_name is not None - else {} - ), - freshness_policy=freshness_policy, - automation_condition=automation_condition, - backfill_policy=backfill_policy, - ) - for asset in assets_defs - ] - if group_name: - source_assets = [ - source_asset.with_attributes(group_name=group_name) - for source_asset in source_assets - ] - cacheable_assets = [ - cached_asset.with_attributes_for_all( - group_name, - freshness_policy=freshness_policy, - auto_materialize_policy=automation_condition.as_auto_materialize_policy() - if automation_condition - else None, - backfill_policy=backfill_policy, - ) - for cached_asset in cacheable_assets - ] - - return [*assets_defs, *source_assets, *cacheable_assets] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py index 6ff6b65131cb4..ea7f2bc28d5d0 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py @@ -141,10 +141,7 @@ def little_richard(): m.setattr(asset_package, "little_richard_dup", little_richard, raising=False) with pytest.raises( DagsterInvalidDefinitionError, - match=re.escape( - "Asset key little_richard is defined multiple times. " - "Definitions found in modules: dagster_tests.asset_defs_tests.asset_package." - ), + match=re.escape("Asset key little_richard is defined multiple times."), ): load_assets_from_modules([asset_package, module_with_assets]) @@ -270,7 +267,9 @@ def test_source_key_prefix(load_fn): assert get_assets_def_with_key( assets_with_prefix_sources, AssetKey(["foo", "my_cool_prefix", "chuck_berry"]) ).dependency_keys == { + # source prefix AssetKey(["bar", "cooler_prefix", "elvis_presley"]), + # loadable prefix AssetKey(["foo", "my_cool_prefix", "miles_davis"]), }