From bc06a99883b76366a1c91160bfbadb03b76ea7bb Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 16 Dec 2024 12:03:57 -0800 Subject: [PATCH] Include specs in asset module loaders --- .../asset_tutorial_tests/test_cereal.py | 4 +- .../test_serial_asset_graph.py | 4 +- .../dagster-test/dagster_test/toys/repo.py | 16 ++-- .../dagster/dagster/_core/code_pointer.py | 3 +- .../definitions/load_assets_from_modules.py | 95 ++++++++++++------- .../_core/definitions/metadata/source_code.py | 24 +++-- .../dagster/_core/workspace/autodiscovery.py | 6 +- .../asset_package/__init__.py | 4 + .../asset_package/module_with_assets.py | 4 + .../test_asset_defs_source_metadata.py | 10 +- .../asset_defs_tests/test_asset_job.py | 11 ++- .../test_assets_from_modules.py | 47 ++++++++- .../test_unresolved_asset_job.py | 13 ++- 13 files changed, 171 insertions(+), 70 deletions(-) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py index 098674353a622..9c2bb1a39a817 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py @@ -6,5 +6,5 @@ @patch_cereal_requests def test_cereal(): - assets, source_assets, _ = load_assets_from_modules([cereal]) - assert materialize([*assets, *source_assets]) + assets = load_assets_from_modules([cereal]) + assert materialize(assets) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py index 9c0e102292886..70a5d6cbefdb9 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py @@ -6,5 +6,5 @@ @patch_cereal_requests def test_serial_asset_graph(): - assets, source_assets, _ = load_assets_from_modules([serial_asset_graph]) - assert materialize([*assets, *source_assets]) + assets = load_assets_from_modules([serial_asset_graph]) + assert materialize(assets) diff --git a/python_modules/dagster-test/dagster_test/toys/repo.py b/python_modules/dagster-test/dagster_test/toys/repo.py index 317d0f7339652..3f5b7177e57c1 100644 --- a/python_modules/dagster-test/dagster_test/toys/repo.py +++ b/python_modules/dagster-test/dagster_test/toys/repo.py @@ -1,6 +1,8 @@ import warnings +from typing import Sequence, cast from dagster import ExperimentalWarning +from dagster._core.definitions.assets import AssetsDefinition from dagster._time import get_current_timestamp # squelch experimental warnings since we often include experimental things in toys for development @@ -164,19 +166,21 @@ def column_schema_repository(): def table_metadata_repository(): from dagster_test.toys import table_metadata - return load_assets_from_modules([table_metadata]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([table_metadata])) @repository def long_asset_keys_repository(): from dagster_test.toys import long_asset_keys - return load_assets_from_modules([long_asset_keys]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([long_asset_keys])) -@repository # pyright: ignore[reportArgumentType] +@repository def big_honkin_assets_repository(): - return [load_assets_from_modules([big_honkin_asset_graph_module])] + return cast( + Sequence[AssetsDefinition], [load_assets_from_modules([big_honkin_asset_graph_module])] + ) @repository @@ -208,11 +212,11 @@ def assets_with_sensors_repository(): def conditional_assets_repository(): from dagster_test.toys import conditional_assets - return load_assets_from_modules([conditional_assets]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([conditional_assets])) @repository def data_versions_repository(): from dagster_test.toys import data_versions - return load_assets_from_modules([data_versions]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([data_versions])) diff --git a/python_modules/dagster/dagster/_core/code_pointer.py b/python_modules/dagster/dagster/_core/code_pointer.py index 4601b21e23691..b79293722ea84 100644 --- a/python_modules/dagster/dagster/_core/code_pointer.py +++ b/python_modules/dagster/dagster/_core/code_pointer.py @@ -190,8 +190,7 @@ def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str if fn_name == LOAD_ALL_ASSETS: # LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular # attribute, we should load all the assets in the module. - module_assets, module_source_assets, _ = load_assets_from_modules([module]) - return [*module_assets, *module_source_assets] + return load_assets_from_modules([module]) else: if not hasattr(module, fn_name): raise DagsterInvariantViolationError(f"{fn_name} not found {error_suffix}") diff --git a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py index 60560af0ae43a..63cd3730bff43 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py @@ -63,8 +63,8 @@ def find_subclasses_in_module( yield value -LoadableAssetTypes = Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition] -KeyScopedAssetObjects = (AssetsDefinition, SourceAsset) +LoadableAssetTypes = Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition] +KeyScopedAssetObjects = (AssetsDefinition, AssetSpec, SourceAsset) class LoadedAssetsList: @@ -81,7 +81,8 @@ def from_modules(cls, modules: Iterable[ModuleType]) -> "LoadedAssetsList": { module.__name__: list( find_objects_in_module_of_types( - module, (AssetsDefinition, SourceAsset, CacheableAssetsDefinition) + module, + (AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec), ) ) for module in modules @@ -175,7 +176,7 @@ def _inner(spec: AssetSpec) -> AssetSpec: def key_iterator( - asset: Union[AssetsDefinition, SourceAsset], included_targeted_keys: bool = False + asset: Union[AssetsDefinition, SourceAsset, AssetSpec], included_targeted_keys: bool = False ) -> Iterator[AssetKey]: return ( iter( @@ -203,7 +204,8 @@ def load_assets_from_modules( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets and source assets from the given modules. Args: @@ -226,6 +228,15 @@ def load_assets_from_modules( Sequence[Union[AssetsDefinition, SourceAsset]]: A list containing assets and source assets defined in the given modules. """ + + def _asset_filter(asset: LoadableAssetTypes) -> bool: + if isinstance(asset, AssetsDefinition): + # We don't load asset checks with asset module loaders. + return not has_only_asset_checks(asset) + if isinstance(asset, AssetSpec): + return include_specs + return True + return ( LoadedAssetsList.from_modules(modules) .to_post_load() @@ -245,7 +256,7 @@ def load_assets_from_modules( backfill_policy, "backfill_policy", BackfillPolicy ), ) - .assets_only + .get_objects(_asset_filter) ) @@ -258,7 +269,8 @@ def load_assets_from_current_module( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets from the module where this function is called. @@ -296,6 +308,7 @@ def load_assets_from_current_module( ), backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, + include_specs=include_specs, ) @@ -309,6 +322,7 @@ def load_assets_from_package_module( automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + include_specs: bool = False, ) -> Sequence[LoadableAssetTypes]: """Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module. @@ -344,6 +358,7 @@ def load_assets_from_package_module( automation_condition=automation_condition, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, + include_specs=include_specs, ) @@ -356,7 +371,8 @@ def load_assets_from_package_name( auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets that includes all asset definitions and source assets in all sub-modules of the given package. @@ -389,6 +405,7 @@ def load_assets_from_package_name( auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, + include_specs=include_specs, ) @@ -410,11 +427,17 @@ def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: def replace_keys_in_asset( - asset: Union[AssetsDefinition, SourceAsset], + asset: Union[AssetsDefinition, AssetSpec, SourceAsset], key_replacements: Mapping[AssetKey, AssetKey], -) -> Union[AssetsDefinition, SourceAsset]: - updated_object = ( - asset.with_attributes( +) -> Union[AssetsDefinition, AssetSpec, SourceAsset]: + if isinstance(asset, SourceAsset): + return asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) + if isinstance(asset, AssetSpec): + return asset.replace_attributes( + key=key_replacements.get(asset.key, asset.key), + ) + else: + updated_object = asset.with_attributes( output_asset_key_replacements={ key: key_replacements.get(key, key) for key in key_iterator(asset, included_targeted_keys=True) @@ -423,34 +446,31 @@ def replace_keys_in_asset( key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values() }, ) - if isinstance(asset, AssetsDefinition) - else asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) - ) - if isinstance(asset, AssetChecksDefinition): - updated_object = cast(AssetsDefinition, updated_object) - updated_object = AssetChecksDefinition.create( - keys_by_input_name=updated_object.keys_by_input_name, - node_def=updated_object.op, - check_specs_by_output_name=updated_object.check_specs_by_output_name, - resource_defs=updated_object.resource_defs, - can_subset=updated_object.can_subset, - ) - return updated_object + if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset): + updated_object = AssetChecksDefinition.create( + keys_by_input_name=updated_object.keys_by_input_name, + node_def=updated_object.op, + check_specs_by_output_name=updated_object.check_specs_by_output_name, + resource_defs=updated_object.resource_defs, + can_subset=updated_object.can_subset, + ) + return updated_object class ResolvedAssetObjectList: def __init__( self, - loaded_objects: Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]], + loaded_objects: Sequence[LoadableAssetTypes], ): self.loaded_objects = loaded_objects @cached_property - def assets_defs(self) -> Sequence[AssetsDefinition]: + def assets_defs_and_specs(self) -> Sequence[Union[AssetsDefinition, AssetSpec]]: return [ dagster_object for dagster_object in self.loaded_objects - if isinstance(dagster_object, AssetsDefinition) and dagster_object.keys + if (isinstance(dagster_object, AssetsDefinition) and dagster_object.keys) + or isinstance(dagster_object, AssetSpec) ] @cached_property @@ -462,8 +482,10 @@ def checks_defs(self) -> Sequence[AssetChecksDefinition]: ] @cached_property - def assets_and_checks_defs(self) -> Sequence[Union[AssetsDefinition, AssetChecksDefinition]]: - return [*self.assets_defs, *self.checks_defs] + def assets_defs_specs_and_checks_defs( + self, + ) -> Sequence[Union[AssetsDefinition, AssetSpec, AssetChecksDefinition]]: + return [*self.assets_defs_and_specs, *self.checks_defs] @cached_property def source_assets(self) -> Sequence[SourceAsset]: @@ -475,9 +497,10 @@ def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition) ] - @cached_property - def assets_only(self) -> Sequence[LoadableAssetTypes]: - return [*self.source_assets, *self.assets_defs, *self.cacheable_assets] + def get_objects( + self, filter_fn: Callable[[LoadableAssetTypes], bool] + ) -> Sequence[LoadableAssetTypes]: + return [asset for asset in self.loaded_objects if filter_fn(asset)] def assets_with_loadable_prefix( self, key_prefix: CoercibleToAssetKeyPrefix @@ -489,7 +512,7 @@ def assets_with_loadable_prefix( result_list = [] all_asset_keys = { key - for asset_object in self.assets_and_checks_defs + for asset_object in self.assets_defs_specs_and_checks_defs for key in key_iterator(asset_object, included_targeted_keys=True) } key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys} @@ -554,6 +577,10 @@ def with_attributes( return_list.append( asset.with_attributes(group_name=group_name if group_name else asset.group_name) ) + elif isinstance(asset, AssetSpec): + return_list.append( + _spec_mapper_disallow_group_override(group_name, automation_condition)(asset) + ) else: return_list.append( asset.with_attributes_for_all( diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py index 0ada758d6d752..69e7e3c6d27f8 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py @@ -16,7 +16,7 @@ from dagster._serdes import whitelist_for_serdes if TYPE_CHECKING: - from dagster._core.definitions.assets import AssetsDefinition, SourceAsset + from dagster._core.definitions.assets import AssetsDefinition, AssetSpec, SourceAsset from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition DEFAULT_SOURCE_FILE_KEY = "asset_definition" @@ -86,11 +86,11 @@ def namespace(cls) -> str: def _with_code_source_single_definition( - assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], -) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: + assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"], +) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]: from dagster._core.definitions.assets import AssetsDefinition - # SourceAsset doesn't have an op definition to point to - cacheable assets + # SourceAsset and AssetSpec don't have an op definition to point to - cacheable assets # will be supported eventually but are a bit trickier if not isinstance(assets_def, AssetsDefinition): return assets_def @@ -242,8 +242,8 @@ def convert_local_path_to_git_path( def _convert_local_path_to_git_path_single_definition( base_git_url: str, file_path_mapping: FilePathMapping, - assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], -) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: + assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"], +) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]: from dagster._core.definitions.assets import AssetsDefinition # SourceAsset doesn't have an op definition to point to - cacheable assets @@ -293,11 +293,13 @@ def _build_gitlab_url(url: str, branch: str) -> str: @experimental def link_code_references_to_git( - assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], + assets_defs: Sequence[ + Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"] + ], git_url: str, git_branch: str, file_path_mapping: FilePathMapping, -) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: +) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]]: """Wrapper function which converts local file path code references to source control URLs based on the provided source control URL and branch. @@ -353,8 +355,10 @@ def link_code_references_to_git( @experimental def with_source_code_references( - assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], -) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: + assets_defs: Sequence[ + Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"] + ], +) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]]: """Wrapper function which attaches local code reference metadata to the provided asset definitions. This points to the filepath and line number where the asset body is defined. diff --git a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py index 7c4ce174e3a39..fc9142f40ecca 100644 --- a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py +++ b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py @@ -114,9 +114,9 @@ def loadable_targets_from_loaded_module(module: ModuleType) -> Sequence[Loadable ) ) - module_assets, module_source_assets, _ = load_assets_from_modules([module]) - if len(module_assets) > 0 or len(module_source_assets) > 0: - return [LoadableTarget(LOAD_ALL_ASSETS, [*module_assets, *module_source_assets])] + assets = load_assets_from_modules([module]) + if len(assets) > 0: + return [LoadableTarget(LOAD_ALL_ASSETS, assets)] raise DagsterInvariantViolationError( "No Definitions, RepositoryDefinition, Job, Pipeline, Graph, or AssetsDefinition found in " diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py index 9eaf0d44dad77..44830afc5c1fa 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset +from dagster._core.definitions.asset_spec import AssetSpec @asset @@ -29,4 +30,7 @@ def make_list_of_source_assets(): return [buddy_holly, jerry_lee_lewis] +top_level_spec = AssetSpec("top_level_spec") + + list_of_assets_and_source_assets = [*make_list_of_assets(), *make_list_of_source_assets()] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py index 0cea67687a7f0..eaa8819f9fc57 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset, graph_asset, op +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata import ( CodeReferencesMetadataSet, CodeReferencesMetadataValue, @@ -41,3 +42,6 @@ def multiply_by_two(input_num): @graph_asset def graph_backed_asset(): return multiply_by_two(one()) + + +my_spec = AssetSpec("my_asset_spec") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py index 1d7441635953b..d40b6a3b0c4e2 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py @@ -24,19 +24,19 @@ # {path to module}:{path to file relative to module root}:{line number} EXPECTED_ORIGINS = { - "james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:12", + "james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:13", "chuck_berry": ( - DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:18" + DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:19" ), - "little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:4"), - "fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:16", + "little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:5"), + "fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:17", "miles_davis": ( DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/asset_subpackage/another_module_with_assets.py:6" ), "graph_backed_asset": ( - DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:41" + DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:42" ), } diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py index 07ff2d7bc2f65..0e96216bd1477 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py @@ -43,7 +43,6 @@ from dagster._core.definitions.decorators.asset_check_decorator import asset_check from dagster._core.definitions.dependency import NodeHandle, NodeInvocation from dagster._core.definitions.executor_definition import in_process_executor -from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.errors import DagsterInvalidSubsetError from dagster._core.execution.api import execute_run_iterator from dagster._core.snap import DependencyStructureIndex @@ -2362,7 +2361,15 @@ def test_asset_group_build_subset_job(job_selection, expected_assets, use_multi, all_assets = _get_assets_defs(use_multi=use_multi, allow_subset=use_multi) # apply prefixes for prefix in reversed(prefixes or []): - all_assets, _ = prefix_assets(all_assets, prefix, [], None) + all_assets = [ + assets_def.with_attributes( + input_asset_key_replacements={ + k: k.with_prefix(prefix) for k in assets_def.keys_by_input_name.values() + }, + output_asset_key_replacements={k: k.with_prefix(prefix) for k in assets_def.keys}, + ) + for assets_def in all_assets + ] defs = Definitions( # for these, if we have multi assets, we'll always allow them to be subset diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py index ea7f2bc28d5d0..97d9549e615da 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py @@ -14,6 +14,7 @@ load_assets_from_package_module, load_assets_from_package_name, ) +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition @@ -101,6 +102,15 @@ def test_load_assets_from_package_name(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_package_module(): from dagster_tests.asset_defs_tests import asset_package @@ -117,6 +127,15 @@ def test_load_assets_from_package_module(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_modules(monkeypatch): from dagster_tests.asset_defs_tests import asset_package @@ -145,6 +164,19 @@ def little_richard(): ): load_assets_from_modules([asset_package, module_with_assets]) + # Create an AssetsDefinition with an identical spec to that in the module + with monkeypatch.context() as m: + + @asset + def top_level_spec(): + pass + + m.setattr(asset_package, "top_level_spec_same_assets_def", top_level_spec, raising=False) + with pytest.raises( + DagsterInvalidDefinitionError, + ): + load_assets_from_modules([asset_package, module_with_assets], include_specs=True) + @asset(group_name="my_group") def asset_in_current_module(): @@ -153,12 +185,22 @@ def asset_in_current_module(): source_asset_in_current_module = SourceAsset(AssetKey("source_asset_in_current_module")) +spec_in_current_module = AssetSpec("spec_in_current_module") + def test_load_assets_from_current_module(): assets = load_assets_from_current_module() assets = [get_unique_asset_identifier(asset) for asset in assets] - assert assets == ["asset_in_current_module", AssetKey("source_asset_in_current_module")] + assert set(assets) == {"asset_in_current_module", AssetKey("source_asset_in_current_module")} assert len(assets) == 2 + assets = load_assets_from_current_module(include_specs=True) + assets = [get_unique_asset_identifier(asset) for asset in assets] + assert len(assets) == 3 + assert set(assets) == { + "asset_in_current_module", + AssetKey("source_asset_in_current_module"), + AssetKey("spec_in_current_module"), + } def test_load_assets_from_modules_with_group_name(): @@ -176,7 +218,8 @@ def test_load_assets_from_modules_with_group_name(): def test_respect_existing_groups(): assets = load_assets_from_current_module() - assert assets[0].group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue] + assets_def = next(iter(a for a in assets if isinstance(a, AssetsDefinition))) + assert assets_def.group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue] with pytest.raises(DagsterInvalidDefinitionError): load_assets_from_current_module(group_name="yay") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 563d5d46cf59c..51f4917ae23b5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -25,7 +25,6 @@ from dagster._core.definitions import asset, multi_asset from dagster._core.definitions.decorators.hook_decorator import failure_hook, success_hook from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.definitions.partition import ( StaticPartitionsDefinition, static_partitioned_config, @@ -354,7 +353,17 @@ def test_define_selection_job(job_selection, expected_assets, use_multi, prefixe prefixed_assets = _get_assets_defs(use_multi=use_multi, allow_subset=use_multi) # apply prefixes for prefix in reversed(prefixes or []): - prefixed_assets, _ = prefix_assets(prefixed_assets, prefix, [], None) + prefixed_assets = [ + assets_def.with_attributes( + input_asset_key_replacements={ + key: key.with_prefix(prefix) for key in assets_def.keys_by_input_name.values() + }, + output_asset_key_replacements={ + key: key.with_prefix(prefix) for key in assets_def.keys + }, + ) + for assets_def in prefixed_assets + ] final_assets = with_resources( prefixed_assets,