diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 0fe26bd7c9376..9e6cdb66dd508 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -33,6 +33,7 @@ ) from dagster._utils.cached_method import cached_method +from .asset_check_spec import AssetCheckHandle from .asset_checks import AssetChecksDefinition from .assets import AssetsDefinition from .backfill_policy import BackfillPolicy @@ -706,6 +707,17 @@ def __init__( self._source_assets = source_assets self._asset_checks = asset_checks + asset_check_handles = set() + for asset_check in asset_checks: + asset_check_handles.update([spec.handle for spec in asset_check.specs]) + for asset in assets: + asset_check_handles.update([spec.handle for spec in asset.check_specs]) + self._asset_check_handles = asset_check_handles + + @property + def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]: + return self._asset_check_handles + @property def assets(self) -> Sequence[AssetsDefinition]: return self._assets diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 9fededc756c99..6a3fbcc873207 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -785,6 +785,7 @@ def build_asset_selection_job( tags: Optional[Mapping[str, Any]] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, asset_selection: Optional[AbstractSet[AssetKey]] = None, + asset_check_selection: Optional[AbstractSet[AssetCheckHandle]] = None, asset_selection_data: Optional[AssetSelectionData] = None, hooks: Optional[AbstractSet[HookDefinition]] = None, ) -> "JobDefinition": @@ -793,9 +794,7 @@ def build_asset_selection_job( build_source_asset_observation_job, ) - asset_check_selection = ( - asset_selection_data.asset_check_selection if asset_selection_data else None - ) + if asset_selection is None and asset_check_selection is None: # no selections, include everything diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 25d82de988fad..af709c3fbc4b1 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -2,7 +2,7 @@ import operator from abc import ABC, abstractmethod from functools import reduce -from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast +from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast, Tuple from typing_extensions import TypeAlias @@ -24,6 +24,7 @@ CoercibleToAssetKeyPrefix, key_prefix_from_coercible, ) +from .asset_check_spec import AssetCheckHandle from .source_asset import SourceAsset CoercibleToAssetSelection: TypeAlias = Union[ @@ -235,6 +236,26 @@ def upstream_source_assets(self) -> "SourceAssetSelection": """ return SourceAssetSelection(self) + @public + @staticmethod + def all_asset_checks() -> "AllAssetSelection": + """Returns a selection that includes all asset checks.""" + return AllAssetCheckSelection() + + @public + @staticmethod + def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection": + """Returns a selection that includes all of the provided assets.""" + return AssetChecksForAssetKeys(*(key for assets_def in assets_defs for key in assets_def.keys)) + + + @public + @staticmethod + def asset_checks(*asset_checks: Tuple[CoercibleToAssetKey, str]) -> "KeysAssetSelection": + """Returns a selection that includes all of the provided assets.""" + return AssetChecksForHandles(*(AssetCheckHandle(asset_key=AssetKey.from_coercible(key), check_name=check_name) for key, check_name in asset_checks)) + + def __or__(self, other: "AssetSelection") -> "OrAssetSelection": check.inst_param(other, "other", AssetSelection) return OrAssetSelection(self, other) @@ -266,10 +287,22 @@ def resolve( ) return resolved + def resolve_checks( + self, asset_graph: AssetGraph + ) -> AbstractSet[AssetCheckHandle]: + return self.resolve_checks_inner(asset_graph) + + + @abstractmethod def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: raise NotImplementedError() + + @abstractmethod + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: + raise NotImplementedError + @staticmethod def _selection_from_string(string: str) -> "AssetSelection": from dagster._core.definitions import AssetSelection @@ -328,8 +361,37 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection class AllAssetSelection(AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return asset_graph.materializable_asset_keys + + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return set() + +class AllAssetCheckSelection(AssetSelection): + def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: + return set() + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return asset_graph.asset_check_handles + + +class AssetChecksForAssetKeys(AssetSelection): + def __init__(self, keys: AssetKey): + self._keys = keys + + def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: + return set() + + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return {handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys} + +class AssetChecksForHandles(AssetSelection): + def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]): + self._asset_check_handles = asset_check_handles + + def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: + return set() + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return {handle for handle in asset_graph.asset_check_handles if handle in self._asset_check_handles} class AndAssetSelection(AssetSelection): def __init__(self, left: AssetSelection, right: AssetSelection): self._left = left diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index 82263b4c5740a..95542eaa256c3 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -836,6 +836,7 @@ def _get_job_def_for_asset_selection( description=self.description, tags=self.tags, asset_selection=asset_selection, + asset_check_selection=asset_check_selection, asset_selection_data=asset_selection_data, config=self.config_mapping or self.partitioned_config, asset_checks=self.asset_layer.asset_checks_defs, diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index e565873b90dff..8d1a587833f8a 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -180,6 +180,7 @@ def resolve( assets = asset_graph.assets source_assets = asset_graph.source_assets selected_asset_keys = self.selection.resolve(asset_graph) + selected_asset_checks = self.selection.resolve_checks(asset_graph) asset_keys_by_partitions_def = defaultdict(set) for asset_key in selected_asset_keys: @@ -224,6 +225,7 @@ def resolve( tags=self.tags, metadata=self.metadata, asset_selection=selected_asset_keys, + asset_check_selection=selected_asset_checks, partitions_def=self.partitions_def if self.partitions_def else inferred_partitions_def, executor_def=self.executor_def or default_executor_def, hooks=self.hooks, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py new file mode 100644 index 0000000000000..717bdacdc738b --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py @@ -0,0 +1,162 @@ +from dagster import ( + AssetCheckResult, + AssetSelection, + Definitions, + ExecuteInProcessResult, + asset, + asset_check, + define_asset_job, +) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle +from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition + + +@asset +def asset1(): + ... + + +@asset +def asset2(): + ... + + +@asset_check(asset=asset1) +def asset1_check1(): + return AssetCheckResult(success=True) + + +@asset_check(asset=asset1) +def asset1_check2(): + return AssetCheckResult(success=True) + + +@asset_check(asset=asset2) +def asset2_check1(): + return AssetCheckResult(success=True) + + +def execute_asset_job_in_process(asset_job: UnresolvedAssetJobDefinition) -> ExecuteInProcessResult: + assets = [asset1, asset2] + asset_checks = [asset1_check1, asset1_check2, asset2_check1] + defs = Definitions(assets=assets, jobs=[asset_job], asset_checks=asset_checks) + job_def = defs.get_job_def(asset_job.name) + return job_def.execute_in_process() + + +def test_job_with_all_checks_no_materializations(): + job_def = define_asset_job("job1", selection=AssetSelection.all_asset_checks()) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 0 + check_evals = result.get_asset_check_evaluations() + assert {check_eval.asset_check_handle for check_eval in check_evals} == { + AssetCheckHandle(asset1.key, "asset1_check1"), + AssetCheckHandle(asset1.key, "asset1_check2"), + AssetCheckHandle(asset2.key, "asset2_check1"), + } + + +def test_job_with_all_checks_for_asset(): + job_def = define_asset_job("job1", selection=AssetSelection.checks_for_asset(asset1)) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 0 + check_evals = result.get_asset_check_evaluations() + assert {check_eval.handle for check_eval in check_evals} == { + AssetCheckHandle(asset1.key, "asset1_check1"), + AssetCheckHandle(asset1.key, "asset1_check2"), + } + + +# def test_job_with_asset_and_all_its_checks(): +# job_def = define_asset_job("job1", selection=AssetSelection.assets(asset1)) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 1 +# check_evals = result.get_asset_check_evaluations() +# assert {check_eval.handle for check_eval in check_evals} == { +# AssetCheckHandle(asset1.key, "asset1_check1"), +# AssetCheckHandle(asset1.key, "asset1_check2"), +# } + + +# def test_job_with_single_check(): +# job_def = define_asset_job("job1", selection=AssetSelection.asset_checks(asset1_check1)) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 0 +# check_evals = result.get_asset_check_evaluations() +# assert {check_eval.handle for check_eval in check_evals} == { +# AssetCheckHandle(asset1.key, "asset1_check1"), +# } + + +# def test_job_with_all_assets_but_no_checks(): +# job_def = define_asset_job( +# "job1", selection=AssetSelection.all_assets() - AssetSelection.all_asset_checks(asset1_check1) +# ) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 2 +# check_evals = result.get_asset_check_evaluations() +# assert len(check_evals) == 0 + + +# def test_job_with_asset_without_its_checks(): +# job_def = define_asset_job( +# "job1", selection=AssetSelection.assets(asset1) - AssetSelection.all_asset_checks() +# ) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 1 +# check_evals = result.get_asset_check_evaluations() +# assert len(check_evals) == 0 + + +# def test_job_with_all_assets_and_all_checks(): +# job_def = define_asset_job("job1", selection=AssetSelection.all_assets()) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 2 +# check_evals = result.get_asset_check_evaluations() +# assert len(check_evals) == 3 + + +# def test_job_with_all_assets_and_all_but_one_check(): +# job_def = define_asset_job( +# "job1", selection=AssetSelection.all_assets() - AssetSelection.asset_checks(asset1_check1) +# ) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 2 +# check_evals = result.get_asset_check_evaluations() +# assert {check_eval.handle for check_eval in check_evals} == { +# AssetCheckHandle(asset1.key, "asset1_check2"), +# AssetCheckHandle(asset2.key, "asset2_check1"), +# } + + +# def test_include_asset_after_excluding_checks(): +# job_def = define_asset_job( +# "job1", +# selection=(AssetSelection.all_assets() - AssetSelection.all_asset_checks()) +# & AssetSelection.assets(asset1), +# ) +# result = execute_asset_job_in_process(job_def) +# assert result.success + +# assert len(result.get_asset_materialization_events()) == 2 +# check_evals = result.get_asset_check_evaluations() +# assert {check_eval.handle for check_eval in check_evals} == { +# AssetCheckHandle(asset1.key, "asset1_check1"), +# AssetCheckHandle(asset1.key, "asset1_check2"), +# }