Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[module-loaders] [rfc] Include specs in asset module loaders #26524

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import cereal
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_cereal():
assets, source_assets, _ = assets_from_modules([cereal])
assets, source_assets, _ = load_assets_from_modules([cereal])
assert materialize([*assets, *source_assets])
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_serial_asset_graph():
assets, source_assets, _ = assets_from_modules([serial_asset_graph])
assets, source_assets, _ = load_assets_from_modules([serial_asset_graph])
assert materialize([*assets, *source_assets])
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ def describe(self) -> str:


def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object:
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS

if fn_name == LOAD_ALL_ASSETS:
# LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular
# attribute, we should load all the assets in the module.
module_assets, module_source_assets, _ = assets_from_modules([module])
module_assets, module_source_assets, _ = load_assets_from_modules([module])
return [*module_assets, *module_source_assets]
else:
if not hasattr(module, fn_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def replace_attributes(
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,20 @@
import inspect
from importlib import import_module
from types import ModuleType
from typing import Iterable, Optional, Sequence, Set, cast
from typing import Iterable, Optional, Sequence

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_key import (
CoercibleToAssetKeyPrefix,
check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.load_assets_from_modules import (
LoadedAssetsList,
find_modules_in_package,
find_objects_in_module_of_types,
prefix_assets,
)


def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]:
checks = []
ids: Set[int] = set()
for module in modules:
for c in find_objects_in_module_of_types(module, AssetsDefinition):
if has_only_asset_checks(c) and id(c) not in ids:
checks.append(cast(AssetChecksDefinition, c))
ids.add(id(c))
return checks


def _checks_with_attributes(
checks_defs: Sequence[AssetChecksDefinition],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> Sequence[AssetChecksDefinition]:
if asset_key_prefix:
modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None)
return [
AssetChecksDefinition.create(
keys_by_input_name=c.keys_by_input_name,
node_def=c.op,
check_specs_by_output_name=c.check_specs_by_output_name,
resource_defs=c.resource_defs,
can_subset=c.can_subset,
)
for c in modified_checks
]
else:
return checks_defs


def load_asset_checks_from_modules(
modules: Iterable[ModuleType],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
Expand All @@ -68,7 +35,19 @@ def load_asset_checks_from_modules(
asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param(
asset_key_prefix, "asset_key_prefix"
)
return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix)
return (
LoadedAssetsList.from_modules(modules)
.to_post_load()
.with_attributes(
key_prefix=asset_key_prefix,
source_key_prefix=None,
group_name=None,
freshness_policy=None,
automation_condition=None,
backfill_policy=None,
)
.checks_defs
)


def load_asset_checks_from_current_module(
Expand All @@ -95,9 +74,7 @@ def load_asset_checks_from_current_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules([module]), asset_key_prefix=asset_key_prefix
)
return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix)


def load_asset_checks_from_package_module(
Expand All @@ -120,9 +97,8 @@ def load_asset_checks_from_package_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)


Expand All @@ -147,7 +123,6 @@ def load_asset_checks_from_package_name(
)

package_module = import_module(package_name)
return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)
Loading
Loading