Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[module-loaders] Delete extra_source_assets param from assets_from_modules #26494

Merged
merged 12 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from docs_snippets.guides.dagster.asset_tutorial import cereal
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_cereal():
assets, source_assets, _ = assets_from_modules([cereal])
assert materialize([*assets, *source_assets])
assets = load_assets_from_modules([cereal])
assert materialize(assets)
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_serial_asset_graph():
assets, source_assets, _ = assets_from_modules([serial_asset_graph])
assert materialize([*assets, *source_assets])
assets = load_assets_from_modules([serial_asset_graph])
assert materialize(assets)
16 changes: 10 additions & 6 deletions python_modules/dagster-test/dagster_test/toys/repo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import warnings
from typing import Sequence, cast

from dagster import ExperimentalWarning
from dagster._core.definitions.assets import AssetsDefinition
from dagster._time import get_current_timestamp

# squelch experimental warnings since we often include experimental things in toys for development
Expand Down Expand Up @@ -164,19 +166,21 @@ def column_schema_repository():
def table_metadata_repository():
from dagster_test.toys import table_metadata

return load_assets_from_modules([table_metadata])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([table_metadata]))


@repository
def long_asset_keys_repository():
from dagster_test.toys import long_asset_keys

return load_assets_from_modules([long_asset_keys])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([long_asset_keys]))


@repository # pyright: ignore[reportArgumentType]
@repository
def big_honkin_assets_repository():
return [load_assets_from_modules([big_honkin_asset_graph_module])]
return cast(
Sequence[AssetsDefinition], [load_assets_from_modules([big_honkin_asset_graph_module])]
)


@repository
Expand Down Expand Up @@ -208,11 +212,11 @@ def assets_with_sensors_repository():
def conditional_assets_repository():
from dagster_test.toys import conditional_assets

return load_assets_from_modules([conditional_assets])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([conditional_assets]))


@repository
def data_versions_repository():
from dagster_test.toys import data_versions

return load_assets_from_modules([data_versions])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([data_versions]))
24 changes: 12 additions & 12 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,6 @@
InputMapping as InputMapping,
)
from dagster._core.definitions.job_definition import JobDefinition as JobDefinition
from dagster._core.definitions.load_asset_checks_from_modules import (
load_asset_checks_from_current_module as load_asset_checks_from_current_module,
load_asset_checks_from_modules as load_asset_checks_from_modules,
load_asset_checks_from_package_module as load_asset_checks_from_package_module,
load_asset_checks_from_package_name as load_asset_checks_from_package_name,
)
from dagster._core.definitions.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.logger_definition import (
LoggerDefinition as LoggerDefinition,
build_init_logger_context as build_init_logger_context,
Expand Down Expand Up @@ -309,6 +297,18 @@
TableRecord as TableRecord,
TableSchema as TableSchema,
)
from dagster._core.definitions.module_loaders.load_asset_checks_from_modules import (
load_asset_checks_from_current_module as load_asset_checks_from_current_module,
load_asset_checks_from_modules as load_asset_checks_from_modules,
load_asset_checks_from_package_module as load_asset_checks_from_package_module,
load_asset_checks_from_package_name as load_asset_checks_from_package_name,
)
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.multi_asset_sensor_definition import (
MultiAssetSensorDefinition as MultiAssetSensorDefinition,
MultiAssetSensorEvaluationContext as MultiAssetSensorEvaluationContext,
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/_core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,15 @@ def describe(self) -> str:


def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object:
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS

if fn_name == LOAD_ALL_ASSETS:
# LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular
# attribute, we should load all the assets in the module.
module_assets, module_source_assets, _ = assets_from_modules([module])
return [*module_assets, *module_source_assets]
return load_assets_from_modules([module])
else:
if not hasattr(module, fn_name):
raise DagsterInvariantViolationError(f"{fn_name} not found {error_suffix}")
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@
)
from dagster._core.definitions.graph_definition import GraphDefinition as GraphDefinition
from dagster._core.definitions.job_definition import JobDefinition as JobDefinition
from dagster._core.definitions.load_assets_from_modules import (
from dagster._core.definitions.materialize import (
materialize as materialize,
materialize_to_memory as materialize_to_memory,
)
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.materialize import (
materialize as materialize,
materialize_to_memory as materialize_to_memory,
)
from dagster._core.definitions.op_definition import OpDefinition as OpDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition as DynamicPartitionsDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ def get_python_identifier(self) -> str:
@property
def key(self) -> AssetCheckKey:
return AssetCheckKey(self.asset_key, self.name)

def replace_key(self, key: AssetCheckKey) -> "AssetCheckSpec":
return self._replace(asset_key=key.asset_key, name=key.name)
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def normalize_assets(
# AssetKey not subject to any further manipulation.
resolved_deps = ResolvedAssetDependencies(assets_defs, [])

input_asset_key_replacements = [
asset_key_replacements = [
{
raw_key: normalized_key
for input_name, raw_key in ad.keys_by_input_name.items()
Expand All @@ -218,8 +218,8 @@ def normalize_assets(

# Only update the assets defs if we're actually replacing input asset keys
assets_defs = [
ad.with_attributes(input_asset_key_replacements=reps) if reps else ad
for ad, reps in zip(assets_defs, input_asset_key_replacements)
ad.with_attributes(asset_key_replacements=reps) if reps else ad
for ad, reps in zip(assets_defs, asset_key_replacements)
]

# Create unexecutable external assets definitions for any referenced keys for which no
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ def from_db_string(db_string: str) -> Optional["AssetCheckKey"]:
def to_db_string(self) -> str:
return seven.json.dumps({"asset_key": self.asset_key.to_string(), "check_name": self.name})

def with_asset_key_prefix(self, prefix: CoercibleToAssetKeyPrefix) -> "AssetCheckKey":
return AssetCheckKey(self.asset_key.with_prefix(prefix), self.name)

def replace_asset_key(self, asset_key: AssetKey) -> "AssetCheckKey":
return AssetCheckKey(asset_key, self.name)


EntityKey = Union[AssetKey, AssetCheckKey]
T_EntityKey = TypeVar("T_EntityKey", AssetKey, AssetCheckKey, EntityKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def replace_attributes(
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
Expand Down
78 changes: 50 additions & 28 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings

if TYPE_CHECKING:
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.graph_definition import GraphDefinition

ASSET_SUBSET_INPUT_PREFIX = "__subset_input__"
Expand Down Expand Up @@ -1180,11 +1181,34 @@ def get_op_def_for_asset_key(self, key: AssetKey) -> Optional[OpDefinition]:
output_name = self.get_output_name_for_asset_key(key)
return self.node_def.resolve_output_to_origin_op_def(output_name)

def coerce_to_checks_def(self) -> "AssetChecksDefinition":
from dagster._core.definitions.asset_checks import (
AssetChecksDefinition,
has_only_asset_checks,
)

if not has_only_asset_checks(self):
raise DagsterInvalidDefinitionError(
"Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains "
"non-check assets."
)
if len(self.check_keys) == 0:
raise DagsterInvalidDefinitionError(
"Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains no "
"checks."
)
return AssetChecksDefinition.create(
keys_by_input_name=self.keys_by_input_name,
node_def=self.op,
check_specs_by_output_name=self.check_specs_by_output_name,
resource_defs=self.resource_defs,
can_subset=self.can_subset,
)

def with_attributes(
self,
*,
output_asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
input_asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
group_names_by_key: Mapping[AssetKey, str] = {},
tags_by_key: Mapping[AssetKey, Mapping[str, str]] = {},
freshness_policy: Optional[
Expand Down Expand Up @@ -1229,16 +1253,13 @@ def update_replace_dict_and_conflicts(
default_value=DEFAULT_GROUP_NAME,
)

if key in output_asset_key_replacements:
replace_dict["key"] = output_asset_key_replacements[key]
if key in asset_key_replacements:
replace_dict["key"] = asset_key_replacements[key]

if input_asset_key_replacements or output_asset_key_replacements:
if asset_key_replacements:
new_deps = []
for dep in spec.deps:
replacement_key = input_asset_key_replacements.get(
dep.asset_key,
output_asset_key_replacements.get(dep.asset_key),
)
replacement_key = asset_key_replacements.get(dep.asset_key, dep.asset_key)
if replacement_key is not None:
new_deps.append(dep._replace(asset_key=replacement_key))
else:
Expand All @@ -1255,33 +1276,31 @@ def update_replace_dict_and_conflicts(
)

check_specs_by_output_name = {
output_name: check_spec._replace(
asset_key=output_asset_key_replacements.get(
check_spec.asset_key, check_spec.asset_key
output_name: check_spec.replace_key(
key=check_spec.key.replace_asset_key(
asset_key_replacements.get(check_spec.asset_key, check_spec.asset_key)
)
)
for output_name, check_spec in self.node_check_specs_by_output_name.items()
}

selected_asset_check_keys = {
check_key._replace(
asset_key=output_asset_key_replacements.get(
check_key.asset_key, check_key.asset_key
)
check_key.replace_asset_key(
asset_key_replacements.get(check_key.asset_key, check_key.asset_key)
)
for check_key in self.check_keys
}

replaced_attributes = dict(
keys_by_input_name={
input_name: input_asset_key_replacements.get(key, key)
input_name: asset_key_replacements.get(key, key)
for input_name, key in self.node_keys_by_input_name.items()
},
keys_by_output_name={
output_name: output_asset_key_replacements.get(key, key)
output_name: asset_key_replacements.get(key, key)
for output_name, key in self.node_keys_by_output_name.items()
},
selected_asset_keys={output_asset_key_replacements.get(key, key) for key in self.keys},
selected_asset_keys={asset_key_replacements.get(key, key) for key in self.keys},
backfill_policy=backfill_policy if backfill_policy else self.backfill_policy,
is_subset=self.is_subset,
check_specs_by_output_name=check_specs_by_output_name,
Expand Down Expand Up @@ -1903,15 +1922,18 @@ def replace_specs_on_asset(
from dagster._builtins import Nothing
from dagster._core.definitions.input import In

new_deps = set().union(*(spec.deps for spec in replaced_specs))
previous_deps = set().union(*(spec.deps for spec in assets_def.specs))
added_deps = new_deps - previous_deps
removed_deps = previous_deps - new_deps
remaining_original_deps = previous_deps - removed_deps
new_deps_by_key = {dep.asset_key: dep for spec in replaced_specs for dep in spec.deps}
previous_deps_by_key = {dep.asset_key: dep for spec in assets_def.specs for dep in spec.deps}
added_dep_keys = set(new_deps_by_key.keys()) - set(previous_deps_by_key.keys())
removed_dep_keys = set(previous_deps_by_key.keys()) - set(new_deps_by_key.keys())
remaining_original_deps_by_key = {
key: previous_deps_by_key[key]
for key in set(previous_deps_by_key.keys()) - removed_dep_keys
}
original_key_to_input_mapping = reverse_dict(assets_def.node_keys_by_input_name)

# If there are no changes to the dependency structure, we don't need to make any changes to the underlying node.
if not assets_def.is_executable or (not added_deps and not removed_deps):
if not assets_def.is_executable or (not added_dep_keys and not removed_dep_keys):
return assets_def.__class__.dagster_internal_init(
**{**assets_def.get_attributes_dict(), "specs": replaced_specs}
)
Expand All @@ -1925,15 +1947,15 @@ def replace_specs_on_asset(
"Can only add additional deps to an op-backed asset.",
)
# for each deleted dep, we need to make sure it is not an argument-based dep. Argument-based deps cannot be removed.
for dep in removed_deps:
for dep_key in removed_dep_keys:
dep = previous_deps_by_key[dep_key]
input_name = original_key_to_input_mapping[dep.asset_key]
input_def = assets_def.node_def.input_def_named(input_name)
check.invariant(
input_def.dagster_type.is_nothing,
f"Attempted to remove argument-backed dependency {dep.asset_key} (mapped to argument {input_name}) from the asset. Only non-argument dependencies can be changed or removed using map_asset_specs.",
)

remaining_original_deps_by_key = {dep.asset_key: dep for dep in remaining_original_deps}
remaining_ins = {
input_name: the_in
for input_name, the_in in assets_def.node_def.input_dict.items()
Expand All @@ -1943,7 +1965,7 @@ def replace_specs_on_asset(
remaining_ins,
{
stringify_asset_key_to_input_name(dep.asset_key): In(dagster_type=Nothing)
for dep in new_deps
for dep in new_deps_by_key.values()
},
)

Expand Down
Loading
Loading