Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[module-loaders] Use new loader code in checks loaders #26514

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import cereal
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_cereal():
assets, source_assets, _ = assets_from_modules([cereal])
assets, source_assets, _ = load_assets_from_modules([cereal])
assert materialize([*assets, *source_assets])
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_serial_asset_graph():
assets, source_assets, _ = assets_from_modules([serial_asset_graph])
assets, source_assets, _ = load_assets_from_modules([serial_asset_graph])
assert materialize([*assets, *source_assets])
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ def describe(self) -> str:


def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object:
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS

if fn_name == LOAD_ALL_ASSETS:
# LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular
# attribute, we should load all the assets in the module.
module_assets, module_source_assets, _ = assets_from_modules([module])
module_assets, module_source_assets, _ = load_assets_from_modules([module])
return [*module_assets, *module_source_assets]
else:
if not hasattr(module, fn_name):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,20 @@
import inspect
from importlib import import_module
from types import ModuleType
from typing import Iterable, Optional, Sequence, Set, cast
from typing import Iterable, Optional, Sequence

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_key import (
CoercibleToAssetKeyPrefix,
check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.load_assets_from_modules import (
LoadedAssetsList,
find_modules_in_package,
find_objects_in_module_of_types,
prefix_assets,
)


def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]:
checks = []
ids: Set[int] = set()
for module in modules:
for c in find_objects_in_module_of_types(module, AssetsDefinition):
if has_only_asset_checks(c) and id(c) not in ids:
checks.append(cast(AssetChecksDefinition, c))
ids.add(id(c))
return checks


def _checks_with_attributes(
checks_defs: Sequence[AssetChecksDefinition],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> Sequence[AssetChecksDefinition]:
if asset_key_prefix:
modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None)
return [
AssetChecksDefinition.create(
keys_by_input_name=c.keys_by_input_name,
node_def=c.op,
check_specs_by_output_name=c.check_specs_by_output_name,
resource_defs=c.resource_defs,
can_subset=c.can_subset,
)
for c in modified_checks
]
else:
return checks_defs


def load_asset_checks_from_modules(
modules: Iterable[ModuleType],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
Expand All @@ -68,7 +35,19 @@ def load_asset_checks_from_modules(
asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param(
asset_key_prefix, "asset_key_prefix"
)
return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix)
return (
LoadedAssetsList.from_modules(modules)
.to_post_load()
.with_attributes(
key_prefix=asset_key_prefix,
source_key_prefix=None,
group_name=None,
freshness_policy=None,
automation_condition=None,
backfill_policy=None,
)
.checks_defs
)


def load_asset_checks_from_current_module(
Expand All @@ -95,9 +74,7 @@ def load_asset_checks_from_current_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules([module]), asset_key_prefix=asset_key_prefix
)
return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix)


def load_asset_checks_from_package_module(
Expand All @@ -120,9 +97,8 @@ def load_asset_checks_from_package_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)


Expand All @@ -147,7 +123,6 @@ def load_asset_checks_from_package_name(
)

package_module = import_module(package_name)
return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_key import (
AssetKey,
CoercibleToAssetKeyPrefix,
Expand Down Expand Up @@ -178,10 +179,14 @@ def key_iterator(
) -> Iterator[AssetKey]:
return (
iter(
*[asset.keys],
*[check_key.asset_key for check_key in asset.check_keys]
if included_targeted_keys
else [],
[
*asset.keys,
*(
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
[check_key.asset_key for check_key in asset.check_keys]
if included_targeted_keys
else []
),
]
)
if isinstance(asset, AssetsDefinition)
else iter([asset.key])
Expand Down Expand Up @@ -240,7 +245,7 @@ def load_assets_from_modules(
backfill_policy, "backfill_policy", BackfillPolicy
),
)
.loaded_objects
.assets_only
)


Expand Down Expand Up @@ -408,10 +413,11 @@ def replace_keys_in_asset(
asset: Union[AssetsDefinition, SourceAsset],
key_replacements: Mapping[AssetKey, AssetKey],
) -> Union[AssetsDefinition, SourceAsset]:
return (
updated_object = (
asset.with_attributes(
output_asset_key_replacements={
key: key_replacements.get(key, key) for key in asset.keys
key: key_replacements.get(key, key)
for key in key_iterator(asset, included_targeted_keys=True)
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
},
input_asset_key_replacements={
key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values()
Expand All @@ -420,6 +426,16 @@ def replace_keys_in_asset(
if isinstance(asset, AssetsDefinition)
else asset.with_attributes(key=key_replacements.get(asset.key, asset.key))
)
if isinstance(asset, AssetChecksDefinition):
updated_object = cast(AssetsDefinition, updated_object)
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
updated_object = AssetChecksDefinition.create(
keys_by_input_name=updated_object.keys_by_input_name,
node_def=updated_object.op,
check_specs_by_output_name=updated_object.check_specs_by_output_name,
resource_defs=updated_object.resource_defs,
can_subset=updated_object.can_subset,
)
return updated_object


class ResolvedAssetObjectList:
Expand All @@ -437,10 +453,32 @@ def assets_defs(self) -> Sequence[AssetsDefinition]:
if isinstance(dagster_object, AssetsDefinition) and dagster_object.keys
]

@cached_property
def checks_defs(self) -> Sequence[AssetChecksDefinition]:
return [
cast(AssetChecksDefinition, asset)
for asset in self.loaded_objects
if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset)
]

@cached_property
def assets_and_checks_defs(self) -> Sequence[Union[AssetsDefinition, AssetChecksDefinition]]:
return [*self.assets_defs, *self.checks_defs]

@cached_property
def source_assets(self) -> Sequence[SourceAsset]:
return [asset for asset in self.loaded_objects if isinstance(asset, SourceAsset)]

@cached_property
def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]:
return [
asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition)
]

@cached_property
def assets_only(self) -> Sequence[LoadableAssetTypes]:
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
return [*self.source_assets, *self.assets_defs, *self.cacheable_assets]

def assets_with_loadable_prefix(
self, key_prefix: CoercibleToAssetKeyPrefix
) -> "ResolvedAssetObjectList":
Expand All @@ -451,7 +489,7 @@ def assets_with_loadable_prefix(
result_list = []
all_asset_keys = {
key
for asset_object in self.assets_defs
for asset_object in self.assets_and_checks_defs
for key in key_iterator(asset_object, included_targeted_keys=True)
}
key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys}
Expand Down Expand Up @@ -498,13 +536,20 @@ def with_attributes(
return_list = []
for asset in assets_list.loaded_objects:
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(asset, AssetsDefinition):
return_list.append(
asset.map_asset_specs(
_spec_mapper_disallow_group_override(group_name, automation_condition)
).with_attributes(
backfill_policy=backfill_policy, freshness_policy=freshness_policy
)
new_asset = asset.map_asset_specs(
_spec_mapper_disallow_group_override(group_name, automation_condition)
).with_attributes(
backfill_policy=backfill_policy, freshness_policy=freshness_policy
)
if isinstance(asset, AssetChecksDefinition):
new_asset = AssetChecksDefinition.create(
keys_by_input_name=new_asset.keys_by_input_name,
node_def=new_asset.op,
check_specs_by_output_name=new_asset.check_specs_by_output_name,
resource_defs=new_asset.resource_defs,
can_subset=new_asset.can_subset,
)
return_list.append(new_asset)
elif isinstance(asset, SourceAsset):
return_list.append(
asset.with_attributes(group_name=group_name if group_name else asset.group_name)
Expand All @@ -521,95 +566,3 @@ def with_attributes(
)
)
return ResolvedAssetObjectList(return_list)


# No longer used in load_assets_from_modules, next PR will fix for load_asset_checks_from_modules
def prefix_assets(
assets_defs: Sequence[AssetsDefinition],
key_prefix: CoercibleToAssetKeyPrefix,
source_assets: Sequence[SourceAsset],
source_key_prefix: Optional[CoercibleToAssetKeyPrefix],
) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]:
"""Given a list of assets, prefix the input and output asset keys and check specs with key_prefix.
The prefix is not added to source assets.
Input asset keys that reference other assets within assets_defs are "brought along" -
i.e. prefixed as well.
Example with a single asset:
.. code-block:: python
@asset
def asset1():
...
result = prefixed_asset_key_replacements([asset_1], "my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
Example with dependencies within the list of assets:
.. code-block:: python
@asset
def asset1():
...
@asset
def asset2(asset1):
...
result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])}
"""
asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys}
check_target_keys = {
key.asset_key for assets_def in assets_defs for key in assets_def.check_keys
}
source_asset_keys = {source_asset.key for source_asset in source_assets}

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
key_prefix = check.is_list(key_prefix, of_type=str)

if isinstance(source_key_prefix, str):
source_key_prefix = [source_key_prefix]

result_assets: List[AssetsDefinition] = []
for assets_def in assets_defs:
output_asset_key_replacements = {
asset_key: AssetKey([*key_prefix, *asset_key.path])
for asset_key in (
assets_def.keys | {check_key.asset_key for check_key in assets_def.check_keys}
)
}
input_asset_key_replacements = {}
for dep_asset_key in assets_def.keys_by_input_name.values():
if dep_asset_key in asset_keys or dep_asset_key in check_target_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
[*key_prefix, *dep_asset_key.path]
)
elif source_key_prefix and dep_asset_key in source_asset_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
[*source_key_prefix, *dep_asset_key.path]
)

result_assets.append(
assets_def.with_attributes(
output_asset_key_replacements=output_asset_key_replacements,
input_asset_key_replacements=input_asset_key_replacements,
)
)

if source_key_prefix:
result_source_assets = [
source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path]))
for source_asset in source_assets
]
else:
result_source_assets = source_assets

return result_assets, result_source_assets
Loading
Loading