Skip to content

Commit

Permalink
[io-managers-deps] Additional asset check tests (#26461)
Browse files Browse the repository at this point in the history
I used this PR to accumulate the upstack.
## Summary & Motivation
Add some additional asset check tests which outline the behavior when
args are set in various ways. I found this illuminating to build against
for the downstream PRs.
## How I Tested These Changes
Just tests
  • Loading branch information
dpeng817 authored Dec 19, 2024
1 parent f7869db commit cd526bd
Show file tree
Hide file tree
Showing 8 changed files with 583 additions and 65 deletions.
7 changes: 7 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,10 @@ def __new__(
else resolve_dagster_type(dagster_type)
),
)

@classmethod
def from_coercible(cls, coercible: "CoercibleToAssetIn") -> "AssetIn":
return coercible if isinstance(coercible, AssetIn) else AssetIn(key=coercible)


CoercibleToAssetIn = Union[AssetIn, CoercibleToAssetKey]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AbstractSet, Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union
from typing import Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union

from typing_extensions import TypeAlias

Expand All @@ -8,8 +8,8 @@
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
from dagster._core.definitions.asset_in import AssetIn
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.asset_in import AssetIn, CoercibleToAssetIn
from dagster._core.definitions.asset_key import AssetCheckKey
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.declarative_automation.automation_condition import (
Expand All @@ -20,9 +20,10 @@
DecoratorAssetsDefinitionBuilder,
DecoratorAssetsDefinitionBuilderArgs,
NamedIn,
build_named_ins,
build_and_validate_named_ins,
compute_required_resource_keys,
get_function_params_without_context_or_config_or_resources,
validate_named_ins_subset_of_deps,
)
from dagster._core.definitions.decorators.op_decorator import _Op
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
Expand All @@ -44,7 +45,7 @@ def _build_asset_check_named_ins(
asset_key: AssetKey,
fn: Callable[..., Any],
additional_ins: Mapping[str, AssetIn],
additional_deps: Optional[AbstractSet[AssetKey]],
additional_deps: Mapping[AssetKey, AssetDep],
) -> Mapping[AssetKey, NamedIn]:
fn_params = get_function_params_without_context_or_config_or_resources(fn)

Expand All @@ -66,9 +67,9 @@ def _build_asset_check_named_ins(
f"'{in_name}' is specified in 'additional_ins' but isn't a parameter."
)

# if all the fn_params are in additional_ins, then we add the prmary asset as a dep
# if all the fn_params are in additional_ins, then we add the primary asset as a dep
if len(fn_params) == len(additional_ins):
all_deps = {*(additional_deps if additional_deps else set()), asset_key}
all_deps = {**additional_deps, **{asset_key: AssetDep(asset_key)}}
all_ins = additional_ins
# otherwise there should be one extra fn_param, which is the primary asset. Add that as an input
elif len(fn_params) == len(additional_ins) + 1:
Expand All @@ -87,10 +88,10 @@ def _build_asset_check_named_ins(
" the target asset or be specified in 'additional_ins'."
)

return build_named_ins(
return build_and_validate_named_ins(
fn=fn,
asset_ins=all_ins,
deps=all_deps,
deps=all_deps.values(),
)


Expand Down Expand Up @@ -189,7 +190,11 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
resolved_name = name or fn.__name__
asset_key = AssetKey.from_coercible_or_definition(asset)

additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []])
additional_dep_keys = (
{dep.asset_key: dep for dep in make_asset_deps(additional_deps) or []}
if additional_deps
else {}
)
named_in_by_asset_key = _build_asset_check_named_ins(
resolved_name,
asset_key,
Expand Down Expand Up @@ -283,6 +288,7 @@ def multi_asset_check(
required_resource_keys: Optional[Set[str]] = None,
retry_policy: Optional[RetryPolicy] = None,
config_schema: Optional[UserConfigSchema] = None,
ins: Optional[Mapping[str, CoercibleToAssetIn]] = None,
) -> Callable[[Callable[..., Any]], AssetChecksDefinition]:
"""Defines a set of asset checks that can be executed together with the same op.
Expand All @@ -306,6 +312,8 @@ def multi_asset_check(
retry_policy (Optional[RetryPolicy]): The retry policy for the op that executes the checks.
can_subset (bool): Whether the op can emit results for a subset of the asset checks
keys, based on the context.selected_asset_check_keys argument. Defaults to False.
ins (Optional[Mapping[str, Union[AssetKey, AssetIn]]]): A mapping from input name to AssetIn depended upon by
a given asset check. If an AssetKey is provided, it will be converted to an AssetIn with the same key.
Examples:
Expand Down Expand Up @@ -345,12 +353,21 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
outs = {
spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs
}
named_ins_by_asset_key = build_named_ins(
all_deps_by_key = {
**{spec.asset_key: AssetDep(spec.asset_key) for spec in specs},
**{dep.asset_key: dep for spec in specs for dep in (spec.additional_deps or [])},
}

named_ins_by_asset_key = build_and_validate_named_ins(
fn=fn,
asset_ins={},
deps={spec.asset_key for spec in specs}
| {dep.asset_key for spec in specs for dep in spec.additional_deps or []},
asset_ins={
inp_name: AssetIn.from_coercible(coercible) for inp_name, coercible in ins.items()
}
if ins
else {},
deps=all_deps_by_key.values(),
)
validate_named_ins_subset_of_deps(named_ins_by_asset_key, all_deps_by_key)

with disable_dagster_warnings():
op_def = _Op(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
DecoratorAssetsDefinitionBuilder,
DecoratorAssetsDefinitionBuilderArgs,
build_named_ins,
build_and_validate_named_ins,
build_named_outs,
create_check_specs_by_output_name,
validate_and_assign_output_names_to_check_specs,
Expand Down Expand Up @@ -911,7 +911,7 @@ def graph_asset_no_defaults(
kinds: Optional[AbstractSet[str]],
) -> AssetsDefinition:
ins = ins or {}
named_ins = build_named_ins(compose_fn, ins or {}, set())
named_ins = build_and_validate_named_ins(compose_fn, ins or {}, set())
out_asset_key, _asset_name = resolve_asset_key_and_name_for_decorator(
key=key,
key_prefix=key_prefix,
Expand Down Expand Up @@ -1030,7 +1030,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
if asset_in.partition_mapping
}

named_ins = build_named_ins(fn, ins or {}, set())
named_ins = build_and_validate_named_ins(fn, ins or {}, set())
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in named_ins.items()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,39 @@ def get_function_params_without_context_or_config_or_resources(
return new_input_args


def build_named_ins(
def validate_can_coexist(asset_in: AssetIn, asset_dep: AssetDep) -> None:
"""Validates that the asset_in and asset_dep can coexist peacefully on the same asset key.
If both asset_in and asset_dep are set on the same asset key, expect that _no_ properties
are set on AssetIn except for the key itself.
"""
if (
asset_in.metadata
or asset_in.key_prefix
or asset_in.dagster_type != NoValueSentinel
or asset_in.partition_mapping is not None
):
raise DagsterInvalidDefinitionError(
f"Asset key '{asset_dep.asset_key.to_user_string()}' is used as both an input (via AssetIn) and a dependency (via AssetDep). If an asset key is used as an input and also set as a dependency, the input should only define the relationship between the asset key and the input name, or optionally set the input_manager_key. Any other properties should either not be set, or should be set on the dependency."
)


def build_and_validate_named_ins(
fn: Callable[..., Any],
asset_ins: Mapping[str, AssetIn],
deps: Optional[AbstractSet[AssetKey]],
deps: Optional[Iterable[AssetDep]],
) -> Mapping[AssetKey, "NamedIn"]:
"""Creates a mapping from AssetKey to (name of input, In object)."""
deps = check.opt_set_param(deps, "deps", AssetKey)
deps_by_key = {dep.asset_key: dep for dep in deps} if deps else {}
ins_by_asset_key = {
asset_in.key if asset_in.key else AssetKey.from_coercible(input_name): asset_in
for input_name, asset_in in asset_ins.items()
}
shared_keys_between_ins_and_deps = set(ins_by_asset_key.keys()) & set(deps_by_key.keys())
if shared_keys_between_ins_and_deps:
for shared_key in shared_keys_between_ins_and_deps:
validate_can_coexist(ins_by_asset_key[shared_key], deps_by_key[shared_key])

deps = check.opt_iterable_param(deps, "deps", AssetDep)

new_input_args = get_function_params_without_context_or_config_or_resources(fn)

Expand Down Expand Up @@ -126,16 +152,12 @@ def build_named_ins(
In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type),
)

for asset_key in deps:
if asset_key in named_ins_by_asset_key:
raise DagsterInvalidDefinitionError(
f"deps value {asset_key} also declared as input/AssetIn"
for dep in deps:
if dep.asset_key not in named_ins_by_asset_key:
named_ins_by_asset_key[dep.asset_key] = NamedIn(
stringify_asset_key_to_input_name(dep.asset_key),
In(cast(type, Nothing)),
)
# mypy doesn't realize that Nothing is a valid type here
named_ins_by_asset_key[asset_key] = NamedIn(
stringify_asset_key_to_input_name(asset_key),
In(cast(type, Nothing)),
)

return named_ins_by_asset_key

Expand Down Expand Up @@ -348,25 +370,23 @@ def from_multi_asset_specs(
),
)

upstream_keys = set()
upstream_deps = {}
for spec in asset_specs:
for dep in spec.deps:
if dep.asset_key not in named_outs_by_asset_key:
upstream_keys.add(dep.asset_key)
upstream_deps[dep.asset_key] = dep
if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None:
# self-dependent asset also needs to be considered an upstream_key
upstream_keys.add(dep.asset_key)
upstream_deps[dep.asset_key] = dep

# get which asset keys have inputs set
loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set())
unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys}
if unexpected_upstreams:
raise DagsterInvalidDefinitionError(
f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed"
" AssetSpec(s). Set the deps on the appropriate AssetSpec(s)."
)
remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams}
named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys)
named_ins_by_asset_key = build_and_validate_named_ins(
fn, asset_in_map, deps=upstream_deps.values()
)
# We expect that asset_ins are a subset of asset_deps. The reason we do not check this in
# `build_and_validate_named_ins` is because in other decorator pathways, we allow for argument-based
# dependencies which are not specified in deps (such as the asset decorator).
validate_named_ins_subset_of_deps(named_ins_by_asset_key, upstream_deps)

internal_deps = {
spec.key: {dep.asset_key for dep in spec.deps}
Expand Down Expand Up @@ -401,10 +421,10 @@ def from_asset_outs_in_asset_centric_decorator(
check.param_invariant(
not passed_args.specs, "args", "This codepath for non-spec based create"
)
named_ins_by_asset_key = build_named_ins(
named_ins_by_asset_key = build_and_validate_named_ins(
fn,
asset_in_map,
deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()),
deps=upstream_asset_deps or set(),
)
named_outs_by_asset_key = build_named_outs(asset_out_map)

Expand Down Expand Up @@ -653,3 +673,22 @@ def _validate_check_specs_target_relevant_asset_keys(
f"Invalid asset key {spec.asset_key} in check spec {spec.name}. Must be one of"
f" {valid_asset_keys}"
)


def validate_named_ins_subset_of_deps(
named_ins_per_key: Mapping[AssetKey, NamedIn],
asset_deps_by_key: Mapping[AssetKey, AssetDep],
) -> None:
"""Validates that the asset_ins are a subset of the asset_deps. This is a common validation
that we need to do in multiple places, so we've factored it out into a helper function.
"""
asset_dep_keys = set(asset_deps_by_key.keys())
asset_in_keys = set(named_ins_per_key.keys())

if asset_in_keys - asset_dep_keys:
invalid_asset_in_keys = asset_in_keys - asset_dep_keys
raise DagsterInvalidDefinitionError(
f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as AssetIns, but"
" are not specified as `AssetDep` objects on any constituent AssetSpec objects. Asset inputs must be associated with an"
" output produced by the asset."
)
Loading

0 comments on commit cd526bd

Please sign in to comment.