From cd526bd7986e552cc594d202b5b2cc20c7437912 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Wed, 18 Dec 2024 17:53:04 -0800 Subject: [PATCH] [io-managers-deps] Additional asset check tests (#26461) I used this PR to accumulate the upstack. ## Summary & Motivation Add some additional asset check tests which outline the behavior when args are set in various ways. I found this illuminating to build against for the downstream PRs. ## How I Tested These Changes Just tests --- .../dagster/_core/definitions/asset_in.py | 7 + .../decorators/asset_check_decorator.py | 45 ++- .../definitions/decorators/asset_decorator.py | 6 +- .../decorator_assets_definition_builder.py | 91 +++-- .../asset_defs_tests/test_asset_deps.py | 153 ++++++++- .../asset_defs_tests/test_assets.py | 4 +- .../test_asset_check_decorator.py | 320 +++++++++++++++++- ..._asset_check_decorator_secondary_assets.py | 22 +- 8 files changed, 583 insertions(+), 65 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_in.py b/python_modules/dagster/dagster/_core/definitions/asset_in.py index 79b06ec0647f2..448ec1d3476ea 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_in.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_in.py @@ -77,3 +77,10 @@ def __new__( else resolve_dagster_type(dagster_type) ), ) + + @classmethod + def from_coercible(cls, coercible: "CoercibleToAssetIn") -> "AssetIn": + return coercible if isinstance(coercible, AssetIn) else AssetIn(key=coercible) + + +CoercibleToAssetIn = Union[AssetIn, CoercibleToAssetKey] diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py index c52ee4735a1df..a04248af80be0 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py @@ -1,4 +1,4 @@ -from typing import AbstractSet, Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union +from typing import Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union from typing_extensions import TypeAlias @@ -8,8 +8,8 @@ from dagster._core.definitions.asset_check_result import AssetCheckResult from dagster._core.definitions.asset_check_spec import AssetCheckSpec from dagster._core.definitions.asset_checks import AssetChecksDefinition -from dagster._core.definitions.asset_dep import CoercibleToAssetDep -from dagster._core.definitions.asset_in import AssetIn +from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep +from dagster._core.definitions.asset_in import AssetIn, CoercibleToAssetIn from dagster._core.definitions.asset_key import AssetCheckKey from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.declarative_automation.automation_condition import ( @@ -20,9 +20,10 @@ DecoratorAssetsDefinitionBuilder, DecoratorAssetsDefinitionBuilderArgs, NamedIn, - build_named_ins, + build_and_validate_named_ins, compute_required_resource_keys, get_function_params_without_context_or_config_or_resources, + validate_named_ins_subset_of_deps, ) from dagster._core.definitions.decorators.op_decorator import _Op from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey @@ -44,7 +45,7 @@ def _build_asset_check_named_ins( asset_key: AssetKey, fn: Callable[..., Any], additional_ins: Mapping[str, AssetIn], - additional_deps: Optional[AbstractSet[AssetKey]], + additional_deps: Mapping[AssetKey, AssetDep], ) -> Mapping[AssetKey, NamedIn]: fn_params = get_function_params_without_context_or_config_or_resources(fn) @@ -66,9 +67,9 @@ def _build_asset_check_named_ins( f"'{in_name}' is specified in 'additional_ins' but isn't a parameter." ) - # if all the fn_params are in additional_ins, then we add the prmary asset as a dep + # if all the fn_params are in additional_ins, then we add the primary asset as a dep if len(fn_params) == len(additional_ins): - all_deps = {*(additional_deps if additional_deps else set()), asset_key} + all_deps = {**additional_deps, **{asset_key: AssetDep(asset_key)}} all_ins = additional_ins # otherwise there should be one extra fn_param, which is the primary asset. Add that as an input elif len(fn_params) == len(additional_ins) + 1: @@ -87,10 +88,10 @@ def _build_asset_check_named_ins( " the target asset or be specified in 'additional_ins'." ) - return build_named_ins( + return build_and_validate_named_ins( fn=fn, asset_ins=all_ins, - deps=all_deps, + deps=all_deps.values(), ) @@ -189,7 +190,11 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: resolved_name = name or fn.__name__ asset_key = AssetKey.from_coercible_or_definition(asset) - additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []]) + additional_dep_keys = ( + {dep.asset_key: dep for dep in make_asset_deps(additional_deps) or []} + if additional_deps + else {} + ) named_in_by_asset_key = _build_asset_check_named_ins( resolved_name, asset_key, @@ -283,6 +288,7 @@ def multi_asset_check( required_resource_keys: Optional[Set[str]] = None, retry_policy: Optional[RetryPolicy] = None, config_schema: Optional[UserConfigSchema] = None, + ins: Optional[Mapping[str, CoercibleToAssetIn]] = None, ) -> Callable[[Callable[..., Any]], AssetChecksDefinition]: """Defines a set of asset checks that can be executed together with the same op. @@ -306,6 +312,8 @@ def multi_asset_check( retry_policy (Optional[RetryPolicy]): The retry policy for the op that executes the checks. can_subset (bool): Whether the op can emit results for a subset of the asset checks keys, based on the context.selected_asset_check_keys argument. Defaults to False. + ins (Optional[Mapping[str, Union[AssetKey, AssetIn]]]): A mapping from input name to AssetIn depended upon by + a given asset check. If an AssetKey is provided, it will be converted to an AssetIn with the same key. Examples: @@ -345,12 +353,21 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition: outs = { spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs } - named_ins_by_asset_key = build_named_ins( + all_deps_by_key = { + **{spec.asset_key: AssetDep(spec.asset_key) for spec in specs}, + **{dep.asset_key: dep for spec in specs for dep in (spec.additional_deps or [])}, + } + + named_ins_by_asset_key = build_and_validate_named_ins( fn=fn, - asset_ins={}, - deps={spec.asset_key for spec in specs} - | {dep.asset_key for spec in specs for dep in spec.additional_deps or []}, + asset_ins={ + inp_name: AssetIn.from_coercible(coercible) for inp_name, coercible in ins.items() + } + if ins + else {}, + deps=all_deps_by_key.values(), ) + validate_named_ins_subset_of_deps(named_ins_by_asset_key, all_deps_by_key) with disable_dagster_warnings(): op_def = _Op( diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 3aaa35ba4a0c3..4f53f0668b03b 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -40,7 +40,7 @@ from dagster._core.definitions.decorators.decorator_assets_definition_builder import ( DecoratorAssetsDefinitionBuilder, DecoratorAssetsDefinitionBuilderArgs, - build_named_ins, + build_and_validate_named_ins, build_named_outs, create_check_specs_by_output_name, validate_and_assign_output_names_to_check_specs, @@ -911,7 +911,7 @@ def graph_asset_no_defaults( kinds: Optional[AbstractSet[str]], ) -> AssetsDefinition: ins = ins or {} - named_ins = build_named_ins(compose_fn, ins or {}, set()) + named_ins = build_and_validate_named_ins(compose_fn, ins or {}, set()) out_asset_key, _asset_name = resolve_asset_key_and_name_for_decorator( key=key, key_prefix=key_prefix, @@ -1030,7 +1030,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: if asset_in.partition_mapping } - named_ins = build_named_ins(fn, ins or {}, set()) + named_ins = build_and_validate_named_ins(fn, ins or {}, set()) keys_by_input_name = { input_name: asset_key for asset_key, (input_name, _) in named_ins.items() } diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py index 0eab760bc93e7..f96462413a596 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py @@ -75,13 +75,39 @@ def get_function_params_without_context_or_config_or_resources( return new_input_args -def build_named_ins( +def validate_can_coexist(asset_in: AssetIn, asset_dep: AssetDep) -> None: + """Validates that the asset_in and asset_dep can coexist peacefully on the same asset key. + If both asset_in and asset_dep are set on the same asset key, expect that _no_ properties + are set on AssetIn except for the key itself. + """ + if ( + asset_in.metadata + or asset_in.key_prefix + or asset_in.dagster_type != NoValueSentinel + or asset_in.partition_mapping is not None + ): + raise DagsterInvalidDefinitionError( + f"Asset key '{asset_dep.asset_key.to_user_string()}' is used as both an input (via AssetIn) and a dependency (via AssetDep). If an asset key is used as an input and also set as a dependency, the input should only define the relationship between the asset key and the input name, or optionally set the input_manager_key. Any other properties should either not be set, or should be set on the dependency." + ) + + +def build_and_validate_named_ins( fn: Callable[..., Any], asset_ins: Mapping[str, AssetIn], - deps: Optional[AbstractSet[AssetKey]], + deps: Optional[Iterable[AssetDep]], ) -> Mapping[AssetKey, "NamedIn"]: """Creates a mapping from AssetKey to (name of input, In object).""" - deps = check.opt_set_param(deps, "deps", AssetKey) + deps_by_key = {dep.asset_key: dep for dep in deps} if deps else {} + ins_by_asset_key = { + asset_in.key if asset_in.key else AssetKey.from_coercible(input_name): asset_in + for input_name, asset_in in asset_ins.items() + } + shared_keys_between_ins_and_deps = set(ins_by_asset_key.keys()) & set(deps_by_key.keys()) + if shared_keys_between_ins_and_deps: + for shared_key in shared_keys_between_ins_and_deps: + validate_can_coexist(ins_by_asset_key[shared_key], deps_by_key[shared_key]) + + deps = check.opt_iterable_param(deps, "deps", AssetDep) new_input_args = get_function_params_without_context_or_config_or_resources(fn) @@ -126,16 +152,12 @@ def build_named_ins( In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type), ) - for asset_key in deps: - if asset_key in named_ins_by_asset_key: - raise DagsterInvalidDefinitionError( - f"deps value {asset_key} also declared as input/AssetIn" + for dep in deps: + if dep.asset_key not in named_ins_by_asset_key: + named_ins_by_asset_key[dep.asset_key] = NamedIn( + stringify_asset_key_to_input_name(dep.asset_key), + In(cast(type, Nothing)), ) - # mypy doesn't realize that Nothing is a valid type here - named_ins_by_asset_key[asset_key] = NamedIn( - stringify_asset_key_to_input_name(asset_key), - In(cast(type, Nothing)), - ) return named_ins_by_asset_key @@ -348,25 +370,23 @@ def from_multi_asset_specs( ), ) - upstream_keys = set() + upstream_deps = {} for spec in asset_specs: for dep in spec.deps: if dep.asset_key not in named_outs_by_asset_key: - upstream_keys.add(dep.asset_key) + upstream_deps[dep.asset_key] = dep if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None: # self-dependent asset also needs to be considered an upstream_key - upstream_keys.add(dep.asset_key) + upstream_deps[dep.asset_key] = dep # get which asset keys have inputs set - loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set()) - unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys} - if unexpected_upstreams: - raise DagsterInvalidDefinitionError( - f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed" - " AssetSpec(s). Set the deps on the appropriate AssetSpec(s)." - ) - remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams} - named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys) + named_ins_by_asset_key = build_and_validate_named_ins( + fn, asset_in_map, deps=upstream_deps.values() + ) + # We expect that asset_ins are a subset of asset_deps. The reason we do not check this in + # `build_and_validate_named_ins` is because in other decorator pathways, we allow for argument-based + # dependencies which are not specified in deps (such as the asset decorator). + validate_named_ins_subset_of_deps(named_ins_by_asset_key, upstream_deps) internal_deps = { spec.key: {dep.asset_key for dep in spec.deps} @@ -401,10 +421,10 @@ def from_asset_outs_in_asset_centric_decorator( check.param_invariant( not passed_args.specs, "args", "This codepath for non-spec based create" ) - named_ins_by_asset_key = build_named_ins( + named_ins_by_asset_key = build_and_validate_named_ins( fn, asset_in_map, - deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()), + deps=upstream_asset_deps or set(), ) named_outs_by_asset_key = build_named_outs(asset_out_map) @@ -653,3 +673,22 @@ def _validate_check_specs_target_relevant_asset_keys( f"Invalid asset key {spec.asset_key} in check spec {spec.name}. Must be one of" f" {valid_asset_keys}" ) + + +def validate_named_ins_subset_of_deps( + named_ins_per_key: Mapping[AssetKey, NamedIn], + asset_deps_by_key: Mapping[AssetKey, AssetDep], +) -> None: + """Validates that the asset_ins are a subset of the asset_deps. This is a common validation + that we need to do in multiple places, so we've factored it out into a helper function. + """ + asset_dep_keys = set(asset_deps_by_key.keys()) + asset_in_keys = set(named_ins_per_key.keys()) + + if asset_in_keys - asset_dep_keys: + invalid_asset_in_keys = asset_in_keys - asset_dep_keys + raise DagsterInvalidDefinitionError( + f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as AssetIns, but" + " are not specified as `AssetDep` objects on any constituent AssetSpec objects. Asset inputs must be associated with an" + " output produced by the asset." + ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_deps.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_deps.py index 8b9f9056b95cd..a3cfde7ba0541 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_deps.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_deps.py @@ -4,6 +4,7 @@ AssetOut, FilesystemIOManager, IOManager, + Nothing, SourceAsset, TimeWindowPartitionMapping, asset, @@ -12,7 +13,9 @@ ) from dagster._check import ParameterCheckError from dagster._core.definitions.asset_dep import AssetDep +from dagster._core.definitions.asset_in import AssetIn from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.partition_mapping import IdentityPartitionMapping from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterTypeKind @@ -522,20 +525,158 @@ def my_asset(): def test_dep_via_deps_and_fn(): + """Test combining deps and ins in the same @asset-decorated function.""" + @asset def the_upstream_asset(): return 1 - with pytest.raises( - DagsterInvalidDefinitionError, - match=r"deps value .* also declared as input/AssetIn", - ): + # When deps and ins are both set, expect that deps is only used for the asset key and potentially input name. + for param_dict in [ + {"partition_mapping": IdentityPartitionMapping()}, + {"metadata": {"foo": "bar"}}, + {"key_prefix": "prefix"}, + {"dagster_type": Nothing}, + ]: + with pytest.raises(DagsterInvalidDefinitionError): + + @asset( + deps=[AssetDep(the_upstream_asset)], + ins={"the_upstream_asset": AssetIn(**param_dict)}, + ) + def _(the_upstream_asset): + return None + + # We allow the asset key to be set via deps and ins as long as no additional information is set. + @asset(deps=[the_upstream_asset]) + def depends_on_upstream_asset_implicit_remap(the_upstream_asset): + assert the_upstream_asset == 1 + + @asset( + deps=[AssetDep(the_upstream_asset)], ins={"remapped": AssetIn(key=the_upstream_asset.key)} + ) + def depends_on_upstream_asset_explicit_remap(remapped): + assert remapped == 1 + + res = materialize( + [ + the_upstream_asset, + depends_on_upstream_asset_implicit_remap, + depends_on_upstream_asset_explicit_remap, + ], + ) + assert res.success + + @asset + def upstream2(): + return 2 + + # As an unfortunate consequence of the many iterations of dependency specification and the fact that they were all additive with each other, + # we have to support the case where deps are specified separately in both the function signature and the decorator. + # This is not recommended, but it is supported. + @asset(deps=[the_upstream_asset]) + def some_explicit_and_implicit_deps(the_upstream_asset, upstream2): + assert the_upstream_asset == 1 + assert upstream2 == 2 + + @asset(deps=[the_upstream_asset], ins={"remapped": AssetIn(key=upstream2.key)}) + def deps_disjoint_between_args(the_upstream_asset, remapped): + assert the_upstream_asset == 1 + assert remapped == 2 + + res = materialize( + [ + the_upstream_asset, + upstream2, + some_explicit_and_implicit_deps, + deps_disjoint_between_args, + ], + ) + assert res.success + - @asset(deps=[the_upstream_asset]) - def depends_on_upstream_asset(the_upstream_asset): +def test_multi_asset_specs_deps_and_fn(): + @asset + def the_upstream_asset(): + return 1 + + # When deps and ins are both set, expect that deps is only used for the asset key and potentially input name. + for param_dict in [ + {"partition_mapping": IdentityPartitionMapping()}, + {"metadata": {"foo": "bar"}}, + {"key_prefix": "prefix"}, + {"dagster_type": Nothing}, + ]: + with pytest.raises(DagsterInvalidDefinitionError): + + @multi_asset( + specs=[AssetSpec("the_asset", deps=[AssetDep(the_upstream_asset)])], + ins={"the_upstream_asset": AssetIn(**param_dict)}, + ) + def _(the_upstream_asset): + return None + + # We allow the asset key to be set via deps and ins as long as no additional information is set. + @multi_asset(specs=[AssetSpec("the_asset", deps=[the_upstream_asset])]) + def depends_on_upstream_asset_implicit_remap(the_upstream_asset): + assert the_upstream_asset == 1 + + @multi_asset( + specs=[AssetSpec("other_asset", deps=[AssetDep(the_upstream_asset)])], + ins={"remapped": AssetIn(key=the_upstream_asset.key)}, + ) + def depends_on_upstream_asset_explicit_remap(remapped): + assert remapped == 1 + + res = materialize( + [ + the_upstream_asset, + depends_on_upstream_asset_implicit_remap, + depends_on_upstream_asset_explicit_remap, + ], + ) + assert res.success + + # We do not allow you to set a dependency purely via input if you're opting in to the spec pattern. + with pytest.raises(DagsterInvalidDefinitionError): + + @multi_asset( + specs=[AssetSpec("the_asset")], + ) + def _(the_upstream_asset): return None +def test_allow_remapping_io_manager_key() -> None: + @asset + def the_upstream_asset(): + return 1 + + @asset( + deps=[the_upstream_asset], + ins={"the_upstream_asset": AssetIn(input_manager_key="custom_io")}, + ) + def depends_on_upstream_asset(the_upstream_asset): + assert the_upstream_asset == 1 + + calls = [] + + class MyIOManager(IOManager): + def handle_output(self, context, obj): + raise Exception("Should not be called") + + def load_input(self, context): + calls.append("load_input") + return 1 + + res = materialize( + [the_upstream_asset, depends_on_upstream_asset], + resources={"custom_io": MyIOManager()}, + ) + assert res.success + assert calls == ["load_input"] + + def test_duplicate_deps(): @asset def the_upstream_asset(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 1f29844b42a83..5fbc36e955cbb 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -1976,7 +1976,7 @@ def also_input(table_A): ... with pytest.raises( DagsterInvalidDefinitionError, - match="do not have dependencies on the passed AssetSpec", + match="specified as AssetIns", ): @multi_asset(specs=[table_b, table_c]) @@ -1984,7 +1984,7 @@ def rogue_input(table_X): ... with pytest.raises( DagsterInvalidDefinitionError, - match="do not have dependencies on the passed AssetSpec", + match="specified as AssetIns", ): @multi_asset(specs=[table_b_no_dep, table_c_no_dep]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py index 09b0151444fa4..3299b270c437c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py @@ -28,6 +28,7 @@ ) from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_in import AssetIn from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidSubsetError, @@ -118,19 +119,105 @@ def my_check() -> AssetCheckResult: assert spec.asset_key == AssetKey(["prefix", "asset1"]) +def test_asset_check_input() -> None: + @asset + def asset1() -> int: + return 5 + + @asset_check(asset=asset1) + def my_check1(asset1: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset1 == 5) + + @asset_check(asset=asset1) + def my_check2(random_name: int) -> AssetCheckResult: + return AssetCheckResult(passed=random_name == 5) + + @asset_check(asset=asset1) + def my_check3(context, random_name: int) -> AssetCheckResult: + return AssetCheckResult(passed=random_name == 5) + + @asset_check(asset=asset1) + def my_check4(context, asset1: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset1 == 5) + + result = execute_assets_and_checks( + assets=[asset1], asset_checks=[my_check1, my_check2, my_check3, my_check4] + ) + + assert result.success + assert len(result.get_asset_check_evaluations()) == 4 + assert all(check.passed for check in result.get_asset_check_evaluations()) + + +def test_asset_check_additional_ins() -> None: + @asset + def asset1() -> int: + return 5 + + @asset + def asset2() -> int: + return 4 + + @asset_check(asset=asset1, additional_ins={"asset2": AssetIn(key=asset2.key)}) + def my_check(asset1: int, asset2: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset1 == 5 and asset2 == 4) + + @asset_check(asset=asset1, additional_ins={"asset2": AssetIn(key=asset2.key)}) + def my_check2(asset2: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset2 == 4) + + @asset_check(asset=asset1, additional_ins={"asset2": AssetIn(key=asset2.key)}) + def my_check3(context, asset2: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset2 == 4) + + # Error bc asset2 is in additional_ins but not in the function signature + with pytest.raises(DagsterInvalidDefinitionError): + + @asset_check(asset=asset1, additional_ins={"asset2": AssetIn(key=asset2.key)}) + def my_check4(): + return AssetCheckResult(passed=True) + + # Error bc asset1 is in both additional_ins and the function signature + with pytest.raises(DagsterInvalidDefinitionError): + + @asset_check(asset=asset1, additional_ins={"asset1": AssetIn(key=asset1.key)}) + def my_check5(asset1: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset1 == 5) + + # Error bc asset2 is in the function signature but not additional_ins + with pytest.raises(DagsterInvalidDefinitionError): + + @asset_check(asset=asset1, additional_ins={"asset2": AssetIn(key=asset2.key)}) + def my_check6(asset1: int) -> AssetCheckResult: + return AssetCheckResult(passed=asset1 == 5) + + result = execute_assets_and_checks( + assets=[asset1, asset2], asset_checks=[my_check, my_check2, my_check3] + ) + + assert result.success + assert len(result.get_asset_check_evaluations()) == 3 + assert all(check.passed for check in result.get_asset_check_evaluations()) + + def test_asset_check_input_with_prefix() -> None: @asset(key_prefix="prefix") - def asset1() -> None: ... + def asset1() -> int: + return 5 @asset_check(asset=asset1) - def my_check(asset1) -> AssetCheckResult: - return AssetCheckResult(passed=True) + def my_check(unrelated_name: int) -> AssetCheckResult: + return AssetCheckResult(passed=unrelated_name == 5) spec = my_check.get_spec_for_check_key( AssetCheckKey(AssetKey(["prefix", "asset1"]), "my_check") ) assert spec.asset_key == AssetKey(["prefix", "asset1"]) + result = execute_assets_and_checks(assets=[asset1], asset_checks=[my_check]) + assert result.success + assert len(result.get_asset_check_evaluations()) == 1 + def test_execute_asset_and_check() -> None: @asset @@ -862,6 +949,26 @@ def check1(context) -> AssetCheckResult: assert result.passed +def test_direct_invocation_with_input() -> None: + @asset_check(asset="asset1") + def check1(asset1) -> AssetCheckResult: + return AssetCheckResult(passed=True) + + result = check1(5) + assert isinstance(result, AssetCheckResult) + assert result.passed + + +def test_direct_invocation_with_context_and_input() -> None: + @asset_check(asset="asset1") + def check1(context, asset1) -> AssetCheckResult: + return AssetCheckResult(passed=True) + + result = check1(build_op_context(), 5) + assert isinstance(result, AssetCheckResult) + assert result.passed + + def test_multi_check_direct_invocation() -> None: @multi_asset_check( specs=[ @@ -881,3 +988,210 @@ def checks() -> Iterable[AssetCheckResult]: assert results[0].passed assert not results[1].passed assert results[2].passed + + +def test_direct_invocation_with_inputs() -> None: + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ] + ) + def multi_check(asset1: int, asset2: int) -> Iterable[AssetCheckResult]: + yield AssetCheckResult(passed=asset1 == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + result = list(multi_check(4, 5)) # type: ignore + assert len(result) == 2 + assert all(isinstance(r, AssetCheckResult) for r in result) + assert all(r.passed for r in result) + + +def test_direct_invocation_remapped_inputs() -> None: + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ], + ins={"remapped": asset1.key}, + ) + def multi_check(remapped: int, asset2: int) -> Iterable[AssetCheckResult]: + yield AssetCheckResult(passed=remapped == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + result = list(multi_check(4, 5)) # type: ignore + assert len(result) == 2 + assert all(isinstance(r, AssetCheckResult) for r in result) + assert all(r.passed for r in result) + + +def test_multi_check_asset_with_inferred_inputs() -> None: + """Test automatic inference of asset inputs in a multi-check.""" + + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ] + ) + def multi_check(asset1: int, asset2: int) -> Iterable[AssetCheckResult]: + yield AssetCheckResult(passed=asset1 == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + result = execute_assets_and_checks(assets=[asset1, asset2], asset_checks=[multi_check]) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + assert all(check_eval.passed for check_eval in check_evals) + + +def test_multi_check_input_remapping() -> None: + """Test remapping an asset input to a different name in a multi-check.""" + + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ], + ins={"remapped": asset1.key}, + ) + def multi_check(remapped: int, asset2: int) -> Iterable[AssetCheckResult]: + yield AssetCheckResult(passed=remapped == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + result = execute_assets_and_checks(assets=[asset1, asset2], asset_checks=[multi_check]) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + assert all(check_eval.passed for check_eval in check_evals) + + +def test_multi_check_input_remapping_with_context() -> None: + """Test remapping an asset input to a different name in a multi-check.""" + + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ], + ins={"remapped": asset1.key}, + ) + def multi_check(context, remapped: int, asset2: int) -> Iterable[AssetCheckResult]: + assert isinstance(context, AssetCheckExecutionContext) + yield AssetCheckResult(passed=remapped == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + result = execute_assets_and_checks(assets=[asset1, asset2], asset_checks=[multi_check]) + assert result.success + + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 2 + assert all(check_eval.passed for check_eval in check_evals) + + +def test_input_manager_overrides_multi_asset_check_decorator() -> None: + """Test overriding input manager key for a particular asset in a multi-check, ensure that it is correctly mapped.""" + + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ], + ins={"asset1": AssetIn(key="asset1", input_manager_key="override_manager")}, + ) + def my_check(asset1: int, asset2: int) -> Iterable[AssetCheckResult]: + yield AssetCheckResult(passed=asset1 == 4, asset_key="asset1", check_name="check1") + yield AssetCheckResult(passed=asset2 == 5, asset_key="asset2", check_name="check2") + + called = [] + + class MyIOManager(IOManager): + def load_input(self, context) -> int: + called.append(context.asset_key) + return 4 + + def handle_output(self, context, obj) -> None: + raise NotImplementedError() + + result = execute_assets_and_checks( + assets=[asset1, asset2], + asset_checks=[my_check], + resources={"override_manager": MyIOManager()}, + ) + + assert result.success + assert called == [AssetKey("asset1")] + assert all(check_eval.passed for check_eval in result.get_asset_check_evaluations()) + + +def test_nonsense_input_name() -> None: + """Test a nonsensical input name in a multi-check.""" + + @asset + def asset1() -> int: + return 4 + + @asset + def asset2() -> int: + return 5 + + with pytest.raises(DagsterInvalidDefinitionError): + + @multi_asset_check( + specs=[ + AssetCheckSpec("check1", asset=asset1), + AssetCheckSpec("check2", asset=asset2), + ], + ) + def my_check(nonsense: int, asset2: int): + pass diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator_secondary_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator_secondary_assets.py index d06db18106ede..f171e92961452 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator_secondary_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator_secondary_assets.py @@ -98,18 +98,18 @@ def check1(asset_1): def test_additional_ins_and_deps_overlap(): - with pytest.raises( - DagsterInvalidDefinitionError, - match=re.escape("deps value AssetKey(['asset2']) also declared as input/AssetIn"), - ): + @asset_check( + asset=asset1, + additional_ins={"asset_2": AssetIn("asset2")}, + additional_deps=[asset2], + ) + def check1(asset_2) -> AssetCheckResult: + return AssetCheckResult(passed=asset_2 == 5) - @asset_check( # pyright: ignore[reportArgumentType] - asset=asset1, - additional_ins={"asset_2": AssetIn("asset2")}, - additional_deps=[asset2], - ) - def check1(asset_2): - pass + result = execute_assets_and_checks(assets=[asset1, asset2], asset_checks=[check1]) + assert result.success + assert len(result.get_asset_check_evaluations()) == 1 + assert all(e.passed for e in result.get_asset_check_evaluations()) def test_additional_ins_must_correspond_to_params():