diff --git a/python_modules/dagster-test/dagster_test/toys/asset_checks.py b/python_modules/dagster-test/dagster_test/toys/asset_checks.py index e258f12b3e90c..054212885192b 100644 --- a/python_modules/dagster-test/dagster_test/toys/asset_checks.py +++ b/python_modules/dagster-test/dagster_test/toys/asset_checks.py @@ -413,7 +413,8 @@ def downstream_asset(): just_checks_job = define_asset_job( - name="just_checks_job", selection=AssetSelection.all_asset_checks() + name="just_checks_job", + selection=AssetSelection.checks_for_assets(checked_asset), ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 0e1fedc447218..77a0d2ad5ccc8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -545,9 +545,19 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets} assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = { - node_handle: assets_defs_by_key[asset_key] - for asset_key, node_handles in dep_node_handles_by_asset_key.items() - for node_handle in node_handles + # nodes for assets + **{ + node_handle: assets_defs_by_key[asset_key] + for asset_key, node_handles in dep_node_handles_by_asset_key.items() + for node_handle in node_handles + }, + # nodes for asset checks. Required for AssetsDefs that have selected checks + # but not assets + **{ + node_handle: assets_def + for node_handle, assets_def in assets_defs_by_outer_node_handle.items() + if assets_def.check_keys + }, } return AssetLayer( @@ -806,33 +816,32 @@ def build_asset_selection_job( # no selections, include everything included_assets = list(assets) excluded_assets = [] - included_source_assets = [] - included_checks = list(asset_checks) + included_source_assets = list(source_assets) + included_checks_defs = list(asset_checks) else: - if asset_selection is not None: - (included_assets, excluded_assets) = _subset_assets_defs(assets, asset_selection) - included_source_assets = _subset_source_assets(source_assets, asset_selection) - else: - # if checks were specified, then exclude all assets - included_assets = [] - excluded_assets = list(assets) - included_source_assets = [] - - if asset_check_selection is not None: - # NOTE: This filters to a checks def if any of the included specs are in the selection. - # This needs to change to fully subsetting checks in multi assets. - included_checks = [ + # Filter to assets that match either selected assets or include a selected check. + # E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check + # defined with check_specs + (included_assets, excluded_assets) = _subset_assets_defs( + assets, asset_selection or set(), asset_check_selection + ) + included_source_assets = _subset_source_assets(source_assets, asset_selection or set()) + + if asset_check_selection is None: + # If assets were selected and checks are None, then include all checks on the selected assets. + # Note: once we start explicitly passing in asset checks instead of None from the front end, + # we can remove this logic. + included_checks_defs = [ asset_check for asset_check in asset_checks - if [spec for spec in asset_check.specs if spec.key in asset_check_selection] + if asset_check.asset_key in check.not_none(asset_selection) ] else: - # If assets were selected and checks weren't, then include all checks on the selected assets. - # Note: a future diff needs to add support for selecting assets, and not their checks. - included_checks = [ + # Otherwise, filter to explicitly selected checks defs + included_checks_defs = [ asset_check for asset_check in asset_checks - if asset_check.asset_key in check.not_none(asset_selection) + if [spec for spec in asset_check.specs if spec.key in asset_check_selection] ] if partitions_def: @@ -844,11 +853,11 @@ def build_asset_selection_job( f"{partitions_def}.", ) - if len(included_assets) or len(included_checks) > 0: + if len(included_assets) or len(included_checks_defs) > 0: asset_job = build_assets_job( name=name, assets=included_assets, - asset_checks=included_checks, + asset_checks=included_checks_defs, config=config, source_assets=[*source_assets, *excluded_assets], resource_defs=resource_defs, @@ -880,6 +889,7 @@ def build_asset_selection_job( def _subset_assets_defs( assets: Iterable["AssetsDefinition"], selected_asset_keys: AbstractSet[AssetKey], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], ) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]: """Given a list of asset key selection queries, generate a set of AssetsDefinition objects representing the included/excluded definitions. @@ -890,18 +900,33 @@ def _subset_assets_defs( for asset in set(assets): # intersection selected_subset = selected_asset_keys & asset.keys + + # if specific checks were selected, only include those + if selected_asset_check_keys is not None: + selected_check_subset = selected_asset_check_keys & asset.check_keys + # if no checks were selected, filter to checks that target selected assets + else: + selected_check_subset = { + handle for handle in asset.check_keys if handle.asset_key in selected_subset + } + # all assets in this def are selected - if selected_subset == asset.keys: + if selected_subset == asset.keys and selected_check_subset == asset.check_keys: included_assets.add(asset) # no assets in this def are selected - elif len(selected_subset) == 0: + elif len(selected_subset) == 0 and len(selected_check_subset) == 0: excluded_assets.add(asset) elif asset.can_subset: # subset of the asset that we want - subset_asset = asset.subset_for(selected_asset_keys) + subset_asset = asset.subset_for(selected_asset_keys, selected_check_subset) included_assets.add(subset_asset) # subset of the asset that we don't want - excluded_assets.add(asset.subset_for(asset.keys - subset_asset.keys)) + excluded_assets.add( + asset.subset_for( + selected_asset_keys=asset.keys - subset_asset.keys, + selected_asset_check_keys=(asset.check_keys - subset_asset.check_keys), + ) + ) else: raise DagsterInvalidSubsetError( f"When building job, the AssetsDefinition '{asset.node_def.name}' " diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 390b47bf7a8fa..bf7a35c0694b7 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -89,6 +89,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit): _backfill_policy: Optional[BackfillPolicy] _code_versions_by_key: Mapping[AssetKey, Optional[str]] _descriptions_by_key: Mapping[AssetKey, str] + _selected_asset_check_keys: AbstractSet[AssetCheckKey] def __init__( self, @@ -109,6 +110,7 @@ def __init__( backfill_policy: Optional[BackfillPolicy] = None, descriptions_by_key: Optional[Mapping[AssetKey, str]] = None, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, # if adding new fields, make sure to handle them in the with_attributes, from_graph, and # get_attributes_dict methods ): @@ -254,18 +256,22 @@ def __init__( backfill_policy, "backfill_policy", BackfillPolicy ) - if selected_asset_keys is None: + if selected_asset_check_keys is None: self._check_specs_by_output_name = check_specs_by_output_name or {} else: self._check_specs_by_output_name = { output_name: check_spec for output_name, check_spec in (check_specs_by_output_name or {}).items() - if check_spec.asset_key in selected_asset_keys + if check_spec.key in selected_asset_check_keys } self._check_specs_by_handle = { spec.key: spec for spec in self._check_specs_by_output_name.values() } + if selected_asset_check_keys is not None: + self._selected_asset_check_keys = selected_asset_check_keys + else: + self._selected_asset_check_keys = self._check_specs_by_handle.keys() if self._partitions_def is None: # check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned @@ -305,6 +311,7 @@ def dagster_internal_init( backfill_policy: Optional[BackfillPolicy], descriptions_by_key: Optional[Mapping[AssetKey, str]], check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], ) -> "AssetsDefinition": return AssetsDefinition( keys_by_input_name=keys_by_input_name, @@ -323,6 +330,7 @@ def dagster_internal_init( backfill_policy=backfill_policy, descriptions_by_key=descriptions_by_key, check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_keys=selected_asset_check_keys, ) def __call__(self, *args: object, **kwargs: object) -> object: @@ -675,6 +683,7 @@ def _from_node( can_subset=can_subset, selected_asset_keys=None, # node has no subselection info check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_keys=None, ) @public @@ -858,6 +867,16 @@ def check_specs(self) -> Iterable[AssetCheckSpec]: """ return self._check_specs_by_output_name.values() + @property + def check_keys(self) -> AbstractSet[AssetCheckKey]: + """Returns the selected asset checks associated by this AssetsDefinition. + + Returns: + AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is + identified by the asset key and the name of the check. + """ + return self._selected_asset_check_keys + def is_asset_executable(self, asset_key: AssetKey) -> bool: """Returns True if the asset key is materializable by this AssetsDefinition. @@ -1090,12 +1109,17 @@ def _subset_graph_backed_asset( return get_graph_subset(self.node_def, op_selection) - def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefinition": - """Create a subset of this AssetsDefinition that will only materialize the assets in the - selected set. + def subset_for( + self, + selected_asset_keys: AbstractSet[AssetKey], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], + ) -> "AssetsDefinition": + """Create a subset of this AssetsDefinition that will only materialize the assets and checks + in the selected set. Args: selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys + selected_asset_check_keys (AbstractSet[AssetCheckKey]): The selected asset checks """ from dagster._core.definitions.graph_definition import GraphDefinition @@ -1106,10 +1130,23 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin # Set of assets within selected_asset_keys which are outputted by this AssetDefinition asset_subselection = selected_asset_keys & self.keys + if selected_asset_check_keys is None: + # filter to checks that target selected asset keys + asset_check_subselection = { + key for key in self.check_keys if key.asset_key in asset_subselection + } + else: + asset_check_subselection = selected_asset_check_keys & self.check_keys + # Early escape if all assets in AssetsDefinition are selected - if asset_subselection == self.keys: + if asset_subselection == self.keys and asset_check_subselection == self.check_keys: return self elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset + check.invariant( + selected_asset_check_keys == self.check_keys, + "Subsetting graph-backed assets with checks is not yet supported", + ) + subsetted_node = self._subset_graph_backed_asset( asset_subselection, ) @@ -1155,7 +1192,10 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) else: # multi_asset subsetting - replaced_attributes = dict(selected_asset_keys=asset_subselection) + replaced_attributes = { + "selected_asset_keys": asset_subselection, + "selected_asset_check_keys": asset_check_subselection, + } return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) @public @@ -1280,6 +1320,7 @@ def get_attributes_dict(self) -> Dict[str, Any]: backfill_policy=self._backfill_policy, descriptions_by_key=self._descriptions_by_key, check_specs_by_output_name=self._check_specs_by_output_name, + selected_asset_check_keys=self._selected_asset_check_keys, ) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index cf186f3d2f6a9..d0ce5cacebaf1 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -581,7 +581,7 @@ def _dfs(key, cur_color): ret.append(assets_def) else: for asset_keys in color_mapping.values(): - ret.append(assets_def.subset_for(asset_keys)) + ret.append(assets_def.subset_for(asset_keys, selected_asset_check_keys=None)) return ret diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index d509b8b60983e..c7be7afc269b1 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -476,6 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition: metadata_by_key={out_asset_key: self.metadata} if self.metadata else None, descriptions_by_key=None, # not supported for now check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_keys=None, # no subselection in decorator ) @@ -848,6 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: descriptions_by_key=None, # not supported for now metadata_by_key=metadata_by_key, check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_keys=None, # no subselection in decorator ) return inner diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 663eaf4bf3b3f..1dcb2834e521e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -16,7 +16,8 @@ import dagster._check as check from dagster._annotations import deprecated, experimental, public -from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.data_version import ( DataProvenance, @@ -561,6 +562,45 @@ def selected_asset_keys(self) -> AbstractSet[AssetKey]: return set() return self.assets_def.keys + @public + @property + def has_asset_checks_def(self) -> bool: + """Return a boolean indicating the presence of a backing AssetChecksDefinition + for the current execution. + + Returns: + bool: True if there is a backing AssetChecksDefinition for the current execution, otherwise False. + """ + return self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) is not None + + @public + @property + def asset_checks_def(self) -> AssetChecksDefinition: + """The backing AssetChecksDefinition for what is currently executing, errors if not + available. + + Returns: + AssetChecksDefinition. + """ + asset_checks_def = self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) + if asset_checks_def is None: + raise DagsterInvalidPropertyError( + f"Op '{self.op.name}' does not have an asset checks definition." + ) + + return asset_checks_def + + @public + @property + def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + if self.has_assets_def: + return self.assets_def.check_keys + + if self.has_asset_checks_def: + check.failed("Subset selection is not yet supported within an AssetChecksDefinition") + + return set() + @public @property def selected_output_names(self) -> AbstractSet[str]: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 533e87bf3c808..67741cce0af04 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -111,7 +111,9 @@ def test_subset_for(subset, expected_keys, expected_inputs, expected_outputs): def abc_(context, in1, in2, in3): pass - subbed = abc_.subset_for({AssetKey(key) for key in subset.split(",")}) + subbed = abc_.subset_for( + {AssetKey(key) for key in subset.split(",")}, selected_asset_check_keys=None + ) assert subbed.keys == ( {AssetKey(key) for key in expected_keys.split(",")} if expected_keys else set() @@ -293,7 +295,7 @@ def ma_op(): can_subset=True, ) - subset = ma.subset_for({AssetKey("b")}) + subset = ma.subset_for({AssetKey("b")}, selected_asset_check_keys=None) assert subset.group_names_by_key[AssetKey("b")] == "bar" @@ -349,7 +351,8 @@ def abc_(context, in1, in2, in3): } subbed_1 = replaced_1.subset_for( - {AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")} + {AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")}, + selected_asset_check_keys=None, ) assert subbed_1.keys == {AssetKey(["foo", "foo_a"]), AssetKey("b")} @@ -387,7 +390,8 @@ def abc_(context, in1, in2, in3): AssetKey(["again", "foo", "bar_in1"]), AssetKey(["again", "foo", "foo_a"]), AssetKey(["c"]), - } + }, + selected_asset_check_keys=None, ) assert subbed_2.keys == {AssetKey(["again", "foo", "foo_a"])} @@ -398,7 +402,7 @@ def abc_(context, start): pass with pytest.raises(CheckError, match="can_subset=False"): - abc_.subset_for({AssetKey("start"), AssetKey("a")}) + abc_.subset_for({AssetKey("start"), AssetKey("a")}, selected_asset_check_keys=None) def test_fail_for_non_topological_order(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index f1b6df35879dc..6976860747c09 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -513,8 +513,12 @@ def do_run( elif not selected_keys: assets_in_run.extend(a.to_source_assets()) else: - assets_in_run.append(a.subset_for(asset_keys_set)) - assets_in_run.extend(a.subset_for(a.keys - selected_keys).to_source_assets()) + assets_in_run.append(a.subset_for(asset_keys_set, selected_asset_check_keys=None)) + assets_in_run.extend( + a.subset_for( + a.keys - selected_keys, selected_asset_check_keys=None + ).to_source_assets() + ) materialize_to_memory( instance=instance, partition_key=partition_key, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py index de2316f15f662..94a7917831870 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py @@ -176,7 +176,9 @@ def create(self: "InstanceSnapshot") -> None: ) selected_keys = selection.resolve(asset_graph) - sampled_multi_asset = multi_asset.subset_for(selected_keys) + sampled_multi_asset = multi_asset.subset_for( + selected_keys, selected_asset_check_keys=None + ) sampled_roots = [ra for ra in roots if ra.key in selected_keys] to_materialize = [*sampled_roots, sampled_multi_asset] else: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py index d8545bc103f50..074600b44f7a3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py @@ -20,6 +20,9 @@ multi_asset, op, ) +from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.asset_selection import AssetChecksForHandles, AssetSelection +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvariantViolationError, @@ -546,3 +549,136 @@ def asset1(): assert check_eval.asset_key == AssetKey("asset_one") assert check_eval.check_name == "check1" assert check_eval.metadata == {"foo": MetadataValue.text("bar")} + + +def test_can_subset_no_selection() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")} + assert context.selected_asset_check_keys == { + AssetCheckKey(AssetKey("asset1"), "check1"), + AssetCheckKey(AssetKey("asset2"), "check2"), + } + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize([foo]) + + assert len(result.get_asset_materialization_events()) == 1 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1" + + +def test_can_subset() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize([foo], selection=["asset1"]) + + assert len(result.get_asset_materialization_events()) == 1 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1" + + +def test_can_subset_result_for_unselected_check() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + yield AssetCheckResult(asset_key="asset2", check_name="check2", success=True) + + with pytest.raises(DagsterInvariantViolationError): + materialize([foo], selection=["asset1"]) + + +def test_can_subset_select_only_asset() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_keys == set() + + yield Output(value=None, output_name="asset1") + + result = materialize( + [foo], + selection=AssetSelection.keys(AssetKey("asset1")) - AssetSelection.checks_for_assets(foo), + ) + + assert len(result.get_asset_materialization_events()) == 1 + + check_evals = result.get_asset_check_evaluations() + + assert len(check_evals) == 0 + + +def test_can_subset_select_only_check() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == set() + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} + + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize( + [foo], + selection=AssetChecksForHandles( + [AssetCheckKey(asset_key=AssetKey("asset1"), name="check1")] + ), + ) + + assert len(result.get_asset_materialization_events()) == 0 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1"