Skip to content

Commit

Permalink
Use new loader code in checks
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Dec 18, 2024
1 parent 29c4c40 commit 5be4d04
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import cereal
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_cereal():
assets, source_assets, _ = assets_from_modules([cereal])
assets, source_assets, _ = load_assets_from_modules([cereal])
assert materialize([*assets, *source_assets])
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.definitions.materialize import materialize
from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_serial_asset_graph():
assets, source_assets, _ = assets_from_modules([serial_asset_graph])
assets, source_assets, _ = load_assets_from_modules([serial_asset_graph])
assert materialize([*assets, *source_assets])
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ def describe(self) -> str:


def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object:
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.load_assets_from_modules import load_assets_from_modules
from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS

if fn_name == LOAD_ALL_ASSETS:
# LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular
# attribute, we should load all the assets in the module.
module_assets, module_source_assets, _ = assets_from_modules([module])
module_assets, module_source_assets, _ = load_assets_from_modules([module])
return [*module_assets, *module_source_assets]
else:
if not hasattr(module, fn_name):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,20 @@
import inspect
from importlib import import_module
from types import ModuleType
from typing import Iterable, Optional, Sequence, Set, cast
from typing import Iterable, Optional, Sequence

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_key import (
CoercibleToAssetKeyPrefix,
check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.load_assets_from_modules import (
LoadedAssetsList,
find_modules_in_package,
find_objects_in_module_of_types,
prefix_assets,
)


def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]:
checks = []
ids: Set[int] = set()
for module in modules:
for c in find_objects_in_module_of_types(module, AssetsDefinition):
if has_only_asset_checks(c) and id(c) not in ids:
checks.append(cast(AssetChecksDefinition, c))
ids.add(id(c))
return checks


def _checks_with_attributes(
checks_defs: Sequence[AssetChecksDefinition],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> Sequence[AssetChecksDefinition]:
if asset_key_prefix:
modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None)
return [
AssetChecksDefinition.create(
keys_by_input_name=c.keys_by_input_name,
node_def=c.op,
check_specs_by_output_name=c.check_specs_by_output_name,
resource_defs=c.resource_defs,
can_subset=c.can_subset,
)
for c in modified_checks
]
else:
return checks_defs


def load_asset_checks_from_modules(
modules: Iterable[ModuleType],
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
Expand All @@ -68,7 +35,19 @@ def load_asset_checks_from_modules(
asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param(
asset_key_prefix, "asset_key_prefix"
)
return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix)
return (
LoadedAssetsList.from_modules(modules)
.to_post_load()
.with_attributes(
key_prefix=asset_key_prefix,
source_key_prefix=None,
group_name=None,
freshness_policy=None,
automation_condition=None,
backfill_policy=None,
)
.checks_defs
)


def load_asset_checks_from_current_module(
Expand All @@ -95,9 +74,7 @@ def load_asset_checks_from_current_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules([module]), asset_key_prefix=asset_key_prefix
)
return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix)


def load_asset_checks_from_package_module(
Expand All @@ -120,9 +97,8 @@ def load_asset_checks_from_package_module(
asset_key_prefix, "asset_key_prefix"
)

return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)


Expand All @@ -147,7 +123,6 @@ def load_asset_checks_from_package_name(
)

package_module = import_module(package_name)
return _checks_with_attributes(
_checks_from_modules(find_modules_in_package(package_module)),
asset_key_prefix=asset_key_prefix,
return load_asset_checks_from_modules(
find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_key import (
AssetKey,
CoercibleToAssetKeyPrefix,
Expand Down Expand Up @@ -178,10 +179,14 @@ def key_iterator(
) -> Iterator[AssetKey]:
return (
iter(
*[asset.keys],
*[check_key.asset_key for check_key in asset.check_keys]
if included_targeted_keys
else [],
[
*asset.keys,
*(
[check_key.asset_key for check_key in asset.check_keys]
if included_targeted_keys
else []
),
]
)
if isinstance(asset, AssetsDefinition)
else iter([asset.key])
Expand Down Expand Up @@ -240,7 +245,7 @@ def load_assets_from_modules(
backfill_policy, "backfill_policy", BackfillPolicy
),
)
.loaded_objects
.assets_only
)


Expand Down Expand Up @@ -408,10 +413,11 @@ def replace_keys_in_asset(
asset: Union[AssetsDefinition, SourceAsset],
key_replacements: Mapping[AssetKey, AssetKey],
) -> Union[AssetsDefinition, SourceAsset]:
return (
updated_object = (
asset.with_attributes(
output_asset_key_replacements={
key: key_replacements.get(key, key) for key in asset.keys
key: key_replacements.get(key, key)
for key in key_iterator(asset, included_targeted_keys=True)
},
input_asset_key_replacements={
key: key_replacements.get(key, key) for key in asset.keys_by_input_name.values()
Expand All @@ -420,6 +426,16 @@ def replace_keys_in_asset(
if isinstance(asset, AssetsDefinition)
else asset.with_attributes(key=key_replacements.get(asset.key, asset.key))
)
if isinstance(asset, AssetChecksDefinition):
updated_object = cast(AssetsDefinition, updated_object)
updated_object = AssetChecksDefinition.create(
keys_by_input_name=updated_object.keys_by_input_name,
node_def=updated_object.op,
check_specs_by_output_name=updated_object.check_specs_by_output_name,
resource_defs=updated_object.resource_defs,
can_subset=updated_object.can_subset,
)
return updated_object


class ResolvedAssetObjectList:
Expand All @@ -437,10 +453,32 @@ def assets_defs(self) -> Sequence[AssetsDefinition]:
if isinstance(dagster_object, AssetsDefinition) and dagster_object.keys
]

@cached_property
def checks_defs(self) -> Sequence[AssetChecksDefinition]:
return [
cast(AssetChecksDefinition, asset)
for asset in self.loaded_objects
if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset)
]

@cached_property
def assets_and_checks_defs(self) -> Sequence[Union[AssetsDefinition, AssetChecksDefinition]]:
return [*self.assets_defs, *self.checks_defs]

@cached_property
def source_assets(self) -> Sequence[SourceAsset]:
return [asset for asset in self.loaded_objects if isinstance(asset, SourceAsset)]

@cached_property
def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]:
return [
asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition)
]

@cached_property
def assets_only(self) -> Sequence[LoadableAssetTypes]:
return [*self.source_assets, *self.assets_defs, *self.cacheable_assets]

def assets_with_loadable_prefix(
self, key_prefix: CoercibleToAssetKeyPrefix
) -> "ResolvedAssetObjectList":
Expand All @@ -451,7 +489,7 @@ def assets_with_loadable_prefix(
result_list = []
all_asset_keys = {
key
for asset_object in self.assets_defs
for asset_object in self.assets_and_checks_defs
for key in key_iterator(asset_object, included_targeted_keys=True)
}
key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys}
Expand Down Expand Up @@ -498,13 +536,20 @@ def with_attributes(
return_list = []
for asset in assets_list.loaded_objects:
if isinstance(asset, AssetsDefinition):
return_list.append(
asset.map_asset_specs(
_spec_mapper_disallow_group_override(group_name, automation_condition)
).with_attributes(
backfill_policy=backfill_policy, freshness_policy=freshness_policy
)
new_asset = asset.map_asset_specs(
_spec_mapper_disallow_group_override(group_name, automation_condition)
).with_attributes(
backfill_policy=backfill_policy, freshness_policy=freshness_policy
)
if isinstance(asset, AssetChecksDefinition):
new_asset = AssetChecksDefinition.create(
keys_by_input_name=new_asset.keys_by_input_name,
node_def=new_asset.op,
check_specs_by_output_name=new_asset.check_specs_by_output_name,
resource_defs=new_asset.resource_defs,
can_subset=new_asset.can_subset,
)
return_list.append(new_asset)
elif isinstance(asset, SourceAsset):
return_list.append(
asset.with_attributes(group_name=group_name if group_name else asset.group_name)
Expand All @@ -521,95 +566,3 @@ def with_attributes(
)
)
return ResolvedAssetObjectList(return_list)


# No longer used in load_assets_from_modules, next PR will fix for load_asset_checks_from_modules
def prefix_assets(
assets_defs: Sequence[AssetsDefinition],
key_prefix: CoercibleToAssetKeyPrefix,
source_assets: Sequence[SourceAsset],
source_key_prefix: Optional[CoercibleToAssetKeyPrefix],
) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]:
"""Given a list of assets, prefix the input and output asset keys and check specs with key_prefix.
The prefix is not added to source assets.
Input asset keys that reference other assets within assets_defs are "brought along" -
i.e. prefixed as well.
Example with a single asset:
.. code-block:: python
@asset
def asset1():
...
result = prefixed_asset_key_replacements([asset_1], "my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
Example with dependencies within the list of assets:
.. code-block:: python
@asset
def asset1():
...
@asset
def asset2(asset1):
...
result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])}
"""
asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys}
check_target_keys = {
key.asset_key for assets_def in assets_defs for key in assets_def.check_keys
}
source_asset_keys = {source_asset.key for source_asset in source_assets}

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
key_prefix = check.is_list(key_prefix, of_type=str)

if isinstance(source_key_prefix, str):
source_key_prefix = [source_key_prefix]

result_assets: List[AssetsDefinition] = []
for assets_def in assets_defs:
output_asset_key_replacements = {
asset_key: AssetKey([*key_prefix, *asset_key.path])
for asset_key in (
assets_def.keys | {check_key.asset_key for check_key in assets_def.check_keys}
)
}
input_asset_key_replacements = {}
for dep_asset_key in assets_def.keys_by_input_name.values():
if dep_asset_key in asset_keys or dep_asset_key in check_target_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
[*key_prefix, *dep_asset_key.path]
)
elif source_key_prefix and dep_asset_key in source_asset_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
[*source_key_prefix, *dep_asset_key.path]
)

result_assets.append(
assets_def.with_attributes(
output_asset_key_replacements=output_asset_key_replacements,
input_asset_key_replacements=input_asset_key_replacements,
)
)

if source_key_prefix:
result_source_assets = [
source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path]))
for source_asset in source_assets
]
else:
result_source_assets = source_assets

return result_assets, result_source_assets
Loading

0 comments on commit 5be4d04

Please sign in to comment.