From c517f5eec3f82d1f434fd23d661a8fab84f3acec Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Sun, 15 Dec 2024 14:33:58 -0800 Subject: [PATCH] Use new loader code in checks --- .../asset_tutorial_tests/test_cereal.py | 4 +- .../test_serial_asset_graph.py | 4 +- .../dagster/dagster/_core/code_pointer.py | 4 +- .../load_asset_checks_from_modules.py | 67 +++---- .../definitions/load_assets_from_modules.py | 167 +++++++----------- .../dagster/_core/workspace/autodiscovery.py | 4 +- 6 files changed, 89 insertions(+), 161 deletions(-) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py index 4f435b2a491ff..098674353a622 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py @@ -1,4 +1,4 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.definitions.materialize import materialize from docs_snippets.guides.dagster.asset_tutorial import cereal from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @@ -6,5 +6,5 @@ @patch_cereal_requests def test_cereal(): - assets, source_assets, _ = assets_from_modules([cereal]) + assets, source_assets, _ = load_assets_from_modules([cereal]) assert materialize([*assets, *source_assets]) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py index a68ed34ea861f..9c0e102292886 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py @@ -1,4 +1,4 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.definitions.materialize import materialize from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @@ -6,5 +6,5 @@ @patch_cereal_requests def test_serial_asset_graph(): - assets, source_assets, _ = assets_from_modules([serial_asset_graph]) + assets, source_assets, _ = load_assets_from_modules([serial_asset_graph]) assert materialize([*assets, *source_assets]) diff --git a/python_modules/dagster/dagster/_core/code_pointer.py b/python_modules/dagster/dagster/_core/code_pointer.py index cda304cc0f418..4601b21e23691 100644 --- a/python_modules/dagster/dagster/_core/code_pointer.py +++ b/python_modules/dagster/dagster/_core/code_pointer.py @@ -184,13 +184,13 @@ def describe(self) -> str: def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object: - from dagster._core.definitions.load_assets_from_modules import assets_from_modules + from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS if fn_name == LOAD_ALL_ASSETS: # LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular # attribute, we should load all the assets in the module. - module_assets, module_source_assets, _ = assets_from_modules([module]) + module_assets, module_source_assets, _ = load_assets_from_modules([module]) return [*module_assets, *module_source_assets] else: if not hasattr(module, fn_name): diff --git a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py index 5a5d3ec99af40..a587066410f00 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py @@ -1,53 +1,20 @@ import inspect from importlib import import_module from types import ModuleType -from typing import Iterable, Optional, Sequence, Set, cast +from typing import Iterable, Optional, Sequence import dagster._check as check -from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_key import ( CoercibleToAssetKeyPrefix, check_opt_coercible_to_asset_key_prefix_param, ) -from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.load_assets_from_modules import ( + LoadedAssetsList, find_modules_in_package, - find_objects_in_module_of_types, - prefix_assets, ) -def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]: - checks = [] - ids: Set[int] = set() - for module in modules: - for c in find_objects_in_module_of_types(module, AssetsDefinition): - if has_only_asset_checks(c) and id(c) not in ids: - checks.append(cast(AssetChecksDefinition, c)) - ids.add(id(c)) - return checks - - -def _checks_with_attributes( - checks_defs: Sequence[AssetChecksDefinition], - asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[AssetChecksDefinition]: - if asset_key_prefix: - modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None) - return [ - AssetChecksDefinition.create( - keys_by_input_name=c.keys_by_input_name, - node_def=c.op, - check_specs_by_output_name=c.check_specs_by_output_name, - resource_defs=c.resource_defs, - can_subset=c.can_subset, - ) - for c in modified_checks - ] - else: - return checks_defs - - def load_asset_checks_from_modules( modules: Iterable[ModuleType], asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, @@ -68,7 +35,19 @@ def load_asset_checks_from_modules( asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix) + return ( + LoadedAssetsList.from_modules(modules) + .to_post_load() + .with_attributes( + key_prefix=asset_key_prefix, + source_key_prefix=None, + group_name=None, + freshness_policy=None, + automation_condition=None, + backfill_policy=None, + ) + .checks_defs + ) def load_asset_checks_from_current_module( @@ -95,9 +74,7 @@ def load_asset_checks_from_current_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules([module]), asset_key_prefix=asset_key_prefix - ) + return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix) def load_asset_checks_from_package_module( @@ -120,9 +97,8 @@ def load_asset_checks_from_package_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) @@ -147,7 +123,6 @@ def load_asset_checks_from_package_name( ) package_module = import_module(package_name) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) diff --git a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py index 6fc8060fc6bbd..60560af0ae43a 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py @@ -9,16 +9,17 @@ Dict, Iterable, Iterator, - List, Mapping, Optional, Sequence, Tuple, Type, Union, + cast, ) import dagster._check as check +from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks from dagster._core.definitions.asset_key import ( AssetKey, CoercibleToAssetKeyPrefix, @@ -178,10 +179,14 @@ def key_iterator( ) -> Iterator[AssetKey]: return ( iter( - *[asset.keys], - *[check_key.asset_key for check_key in asset.check_keys] - if included_targeted_keys - else [], + [ + *asset.keys, + *( + [check_key.asset_key for check_key in asset.check_keys] + if included_targeted_keys + else [] + ), + ] ) if isinstance(asset, AssetsDefinition) else iter([asset.key]) @@ -240,7 +245,7 @@ def load_assets_from_modules( backfill_policy, "backfill_policy", BackfillPolicy ), ) - .loaded_objects + .assets_only ) @@ -408,10 +413,11 @@ def replace_keys_in_asset( asset: Union[AssetsDefinition, SourceAsset], key_replacements: Mapping[AssetKey, AssetKey], ) -> Union[AssetsDefinition, SourceAsset]: - return ( + updated_object = ( asset.with_attributes( output_asset_key_replacements={ - key: key_replacements.get(key, key) for key in asset.keys + key: key_replacements.get(key, key) + for key in key_iterator(asset, included_targeted_keys=True) }, input_asset_key_replacements={ key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values() @@ -420,6 +426,16 @@ def replace_keys_in_asset( if isinstance(asset, AssetsDefinition) else asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) ) + if isinstance(asset, AssetChecksDefinition): + updated_object = cast(AssetsDefinition, updated_object) + updated_object = AssetChecksDefinition.create( + keys_by_input_name=updated_object.keys_by_input_name, + node_def=updated_object.op, + check_specs_by_output_name=updated_object.check_specs_by_output_name, + resource_defs=updated_object.resource_defs, + can_subset=updated_object.can_subset, + ) + return updated_object class ResolvedAssetObjectList: @@ -437,10 +453,32 @@ def assets_defs(self) -> Sequence[AssetsDefinition]: if isinstance(dagster_object, AssetsDefinition) and dagster_object.keys ] + @cached_property + def checks_defs(self) -> Sequence[AssetChecksDefinition]: + return [ + cast(AssetChecksDefinition, asset) + for asset in self.loaded_objects + if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset) + ] + + @cached_property + def assets_and_checks_defs(self) -> Sequence[Union[AssetsDefinition, AssetChecksDefinition]]: + return [*self.assets_defs, *self.checks_defs] + @cached_property def source_assets(self) -> Sequence[SourceAsset]: return [asset for asset in self.loaded_objects if isinstance(asset, SourceAsset)] + @cached_property + def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: + return [ + asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition) + ] + + @cached_property + def assets_only(self) -> Sequence[LoadableAssetTypes]: + return [*self.source_assets, *self.assets_defs, *self.cacheable_assets] + def assets_with_loadable_prefix( self, key_prefix: CoercibleToAssetKeyPrefix ) -> "ResolvedAssetObjectList": @@ -451,7 +489,7 @@ def assets_with_loadable_prefix( result_list = [] all_asset_keys = { key - for asset_object in self.assets_defs + for asset_object in self.assets_and_checks_defs for key in key_iterator(asset_object, included_targeted_keys=True) } key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys} @@ -498,13 +536,20 @@ def with_attributes( return_list = [] for asset in assets_list.loaded_objects: if isinstance(asset, AssetsDefinition): - return_list.append( - asset.map_asset_specs( - _spec_mapper_disallow_group_override(group_name, automation_condition) - ).with_attributes( - backfill_policy=backfill_policy, freshness_policy=freshness_policy - ) + new_asset = asset.map_asset_specs( + _spec_mapper_disallow_group_override(group_name, automation_condition) + ).with_attributes( + backfill_policy=backfill_policy, freshness_policy=freshness_policy ) + if isinstance(asset, AssetChecksDefinition): + new_asset = AssetChecksDefinition.create( + keys_by_input_name=new_asset.keys_by_input_name, + node_def=new_asset.op, + check_specs_by_output_name=new_asset.check_specs_by_output_name, + resource_defs=new_asset.resource_defs, + can_subset=new_asset.can_subset, + ) + return_list.append(new_asset) elif isinstance(asset, SourceAsset): return_list.append( asset.with_attributes(group_name=group_name if group_name else asset.group_name) @@ -521,95 +566,3 @@ def with_attributes( ) ) return ResolvedAssetObjectList(return_list) - - -# No longer used in load_assets_from_modules, next PR will fix for load_asset_checks_from_modules -def prefix_assets( - assets_defs: Sequence[AssetsDefinition], - key_prefix: CoercibleToAssetKeyPrefix, - source_assets: Sequence[SourceAsset], - source_key_prefix: Optional[CoercibleToAssetKeyPrefix], -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]: - """Given a list of assets, prefix the input and output asset keys and check specs with key_prefix. - The prefix is not added to source assets. - - Input asset keys that reference other assets within assets_defs are "brought along" - - i.e. prefixed as well. - - Example with a single asset: - - .. code-block:: python - - @asset - def asset1(): - ... - - result = prefixed_asset_key_replacements([asset_1], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - - Example with dependencies within the list of assets: - - .. code-block:: python - - @asset - def asset1(): - ... - - @asset - def asset2(asset1): - ... - - result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"]) - assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])} - - """ - asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys} - check_target_keys = { - key.asset_key for assets_def in assets_defs for key in assets_def.check_keys - } - source_asset_keys = {source_asset.key for source_asset in source_assets} - - if isinstance(key_prefix, str): - key_prefix = [key_prefix] - key_prefix = check.is_list(key_prefix, of_type=str) - - if isinstance(source_key_prefix, str): - source_key_prefix = [source_key_prefix] - - result_assets: List[AssetsDefinition] = [] - for assets_def in assets_defs: - output_asset_key_replacements = { - asset_key: AssetKey([*key_prefix, *asset_key.path]) - for asset_key in ( - assets_def.keys | {check_key.asset_key for check_key in assets_def.check_keys} - ) - } - input_asset_key_replacements = {} - for dep_asset_key in assets_def.keys_by_input_name.values(): - if dep_asset_key in asset_keys or dep_asset_key in check_target_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*key_prefix, *dep_asset_key.path] - ) - elif source_key_prefix and dep_asset_key in source_asset_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*source_key_prefix, *dep_asset_key.path] - ) - - result_assets.append( - assets_def.with_attributes( - output_asset_key_replacements=output_asset_key_replacements, - input_asset_key_replacements=input_asset_key_replacements, - ) - ) - - if source_key_prefix: - result_source_assets = [ - source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path])) - for source_asset in source_assets - ] - else: - result_source_assets = source_assets - - return result_assets, result_source_assets diff --git a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py index 3cf0189899cf3..7c4ce174e3a39 100644 --- a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py +++ b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py @@ -5,7 +5,7 @@ from dagster import DagsterInvariantViolationError, GraphDefinition, RepositoryDefinition from dagster._core.code_pointer import load_python_file, load_python_module from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules LOAD_ALL_ASSETS = "<>" @@ -114,7 +114,7 @@ def loadable_targets_from_loaded_module(module: ModuleType) -> Sequence[Loadable ) ) - module_assets, module_source_assets, _ = assets_from_modules([module]) + module_assets, module_source_assets, _ = load_assets_from_modules([module]) if len(module_assets) > 0 or len(module_source_assets) > 0: return [LoadableTarget(LOAD_ALL_ASSETS, [*module_assets, *module_source_assets])]