diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 6a3fbcc873207..aa0578ee7cb3f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -794,8 +794,6 @@ def build_asset_selection_job( build_source_asset_observation_job, ) - - if asset_selection is None and asset_check_selection is None: # no selections, include everything included_assets = list(assets) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 1e16c20ae3590..2a4c993b0a530 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -2,12 +2,13 @@ import operator from abc import ABC, abstractmethod from functools import reduce -from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast, Tuple +from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast from typing_extensions import TypeAlias import dagster._check as check from dagster._annotations import deprecated, public +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.errors import DagsterInvalidSubsetError from dagster._core.selector.subset_selector import ( fetch_connected, @@ -16,6 +17,7 @@ parse_clause, ) +from .asset_check_spec import AssetCheckHandle from .asset_graph import AssetGraph from .assets import AssetsDefinition from .events import ( @@ -24,7 +26,6 @@ CoercibleToAssetKeyPrefix, key_prefix_from_coercible, ) -from .asset_check_spec import AssetCheckHandle from .source_asset import SourceAsset CoercibleToAssetSelection: TypeAlias = Union[ @@ -69,9 +70,21 @@ class AssetSelection(ABC): @public @staticmethod def all() -> "AllAssetSelection": + """Returns a selection that includes all assets and asset checks.""" + return AllSelection() + + @public + @staticmethod + def all_assets() -> "AllAssetSelection": """Returns a selection that includes all assets.""" return AllAssetSelection() + @public + @staticmethod + def all_asset_checks() -> "AllAssetSelection": + """Returns a selection that includes all asset checks.""" + return AllAssetCheckSelection() + @public @staticmethod def assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection": @@ -236,25 +249,24 @@ def upstream_source_assets(self) -> "SourceAssetSelection": """ return SourceAssetSelection(self) - @public - @staticmethod - def all_asset_checks() -> "AllAssetSelection": - """Returns a selection that includes all asset checks.""" - return AllAssetCheckSelection() - @public @staticmethod def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection": """Returns a selection that includes all of the provided assets.""" - return AssetChecksForAssetKeys(*(key for assets_def in assets_defs for key in assets_def.keys)) - + return AssetChecksForAssetKeys( + [key for assets_def in assets_defs for key in assets_def.keys] + ) @public @staticmethod - def asset_checks(*asset_checks: Tuple[CoercibleToAssetKey, str]) -> "KeysAssetSelection": + def asset_checks(*asset_checks: AssetChecksDefinition) -> "KeysAssetSelection": """Returns a selection that includes all of the provided assets.""" - return AssetChecksForHandles(*(AssetCheckHandle(asset_key=AssetKey.from_coercible(key), check_name=check_name) for key, check_name in asset_checks)) - + return AssetChecksForHandles( + [ + AssetCheckHandle(asset_key=AssetKey.from_coercible(spec.asset_key), name=spec.name) + for checks_def in asset_checks for spec in checks_def.specs + ] + ) def __or__(self, other: "AssetSelection") -> "OrAssetSelection": check.inst_param(other, "other", AssetSelection) @@ -287,21 +299,20 @@ def resolve( ) return resolved - def resolve_checks( - self, asset_graph: AssetGraph - ) -> AbstractSet[AssetCheckHandle]: - return self.resolve_checks_inner(asset_graph) - - - @abstractmethod def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: raise NotImplementedError() - - @abstractmethod - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: - raise NotImplementedError + def resolve_checks(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + """We don't need this method currently, but it makes things consistent with resolve_inner.""" + return self.resolve_checks_inner(asset_graph) + + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + """By default, resolve to checks that target the selected assets. This is overriden for particular selections.""" + asset_keys = self.resolve(asset_graph) + return { + handle for handle in asset_graph.asset_check_handles if handle.asset_key in asset_keys + } @staticmethod def _selection_from_string(string: str) -> "AssetSelection": @@ -358,13 +369,19 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection ) +class AllSelection(AssetSelection): + def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: + return asset_graph.materializable_asset_keys + + class AllAssetSelection(AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return asset_graph.materializable_asset_keys - + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: return set() + class AllAssetCheckSelection(AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() @@ -374,14 +391,17 @@ def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetChec class AssetChecksForAssetKeys(AssetSelection): - def __init__(self, keys: AssetKey): + def __init__(self, keys: Sequence[AssetKey]): self._keys = keys def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: - return {handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys} + return { + handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys + } + class AssetChecksForHandles(AssetSelection): def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]): @@ -391,8 +411,13 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: - return {handle for handle in asset_graph.asset_check_handles if handle in self._asset_check_handles} - + return { + handle + for handle in asset_graph.asset_check_handles + if handle in self._asset_check_handles + } + + class AndAssetSelection(AssetSelection): def __init__(self, left: AssetSelection, right: AssetSelection): self._left = left @@ -401,6 +426,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) & self._right.resolve_inner(asset_graph) + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return self._left.resolve_checks_inner(asset_graph) & self._right.resolve_checks_inner( + asset_graph + ) + class SubAssetSelection(AssetSelection): def __init__(self, left: AssetSelection, right: AssetSelection): @@ -410,6 +440,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) - self._right.resolve_inner(asset_graph) + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return self._left.resolve_checks_inner(asset_graph) - self._right.resolve_checks_inner( + asset_graph + ) + class SinkAssetSelection(AssetSelection): def __init__(self, child: AssetSelection): @@ -531,6 +566,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) | self._right.resolve_inner(asset_graph) + def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + return self._left.resolve_checks_inner(asset_graph) | self._right.resolve_checks_inner( + asset_graph + ) + def _fetch_all_upstream( selection: AbstractSet[AssetKey], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py index 73c67a9c3d6f8..6ef9c797c680c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_check_selection.py @@ -12,13 +12,11 @@ @asset -def asset1(): - ... +def asset1(): ... @asset -def asset2(): - ... +def asset2(): ... @asset_check(asset=asset1) @@ -65,20 +63,20 @@ def test_job_with_all_checks_for_asset(): assert len(result.get_asset_materialization_events()) == 0 check_evals = result.get_asset_check_evaluations() - assert {check_eval.handle for check_eval in check_evals} == { + assert {check_eval.asset_check_handle for check_eval in check_evals} == { AssetCheckHandle(asset1.key, "asset1_check1"), AssetCheckHandle(asset1.key, "asset1_check2"), } def test_job_with_asset_and_all_its_checks(): - job_def = define_asset_job("job1", selection=AssetSelection.assets(asset1) & AssetSelection.asset_checks_for_assets(asset1)) + job_def = define_asset_job("job1", selection=AssetSelection.assets(asset1)) result = execute_asset_job_in_process(job_def) assert result.success assert len(result.get_asset_materialization_events()) == 1 check_evals = result.get_asset_check_evaluations() - assert {check_eval.handle for check_eval in check_evals} == { + assert {check_eval.asset_check_handle for check_eval in check_evals} == { AssetCheckHandle(asset1.key, "asset1_check1"), AssetCheckHandle(asset1.key, "asset1_check2"), } @@ -91,14 +89,14 @@ def test_job_with_single_check(): assert len(result.get_asset_materialization_events()) == 0 check_evals = result.get_asset_check_evaluations() - assert {check_eval.handle for check_eval in check_evals} == { + assert {check_eval.asset_check_handle for check_eval in check_evals} == { AssetCheckHandle(asset1.key, "asset1_check1"), } def test_job_with_all_assets_but_no_checks(): job_def = define_asset_job( - "job1", selection=AssetSelection.all_assets() - AssetSelection.all_asset_checks(asset1_check1) + "job1", selection=AssetSelection.all_assets() - AssetSelection.asset_checks(asset1_check1) ) result = execute_asset_job_in_process(job_def) assert result.success @@ -108,55 +106,71 @@ def test_job_with_all_assets_but_no_checks(): assert len(check_evals) == 0 -# def test_job_with_asset_without_its_checks(): -# job_def = define_asset_job( -# "job1", selection=AssetSelection.assets(asset1) - AssetSelection.all_asset_checks() -# ) -# result = execute_asset_job_in_process(job_def) -# assert result.success - -# assert len(result.get_asset_materialization_events()) == 1 -# check_evals = result.get_asset_check_evaluations() -# assert len(check_evals) == 0 - - -# def test_job_with_all_assets_and_all_checks(): -# job_def = define_asset_job("job1", selection=AssetSelection.all_assets()) -# result = execute_asset_job_in_process(job_def) -# assert result.success - -# assert len(result.get_asset_materialization_events()) == 2 -# check_evals = result.get_asset_check_evaluations() -# assert len(check_evals) == 3 - - -# def test_job_with_all_assets_and_all_but_one_check(): -# job_def = define_asset_job( -# "job1", selection=AssetSelection.all_assets() - AssetSelection.asset_checks(asset1_check1) -# ) -# result = execute_asset_job_in_process(job_def) -# assert result.success - -# assert len(result.get_asset_materialization_events()) == 2 -# check_evals = result.get_asset_check_evaluations() -# assert {check_eval.handle for check_eval in check_evals} == { -# AssetCheckHandle(asset1.key, "asset1_check2"), -# AssetCheckHandle(asset2.key, "asset2_check1"), -# } - - -# def test_include_asset_after_excluding_checks(): -# job_def = define_asset_job( -# "job1", -# selection=(AssetSelection.all_assets() - AssetSelection.all_asset_checks()) -# & AssetSelection.assets(asset1), -# ) -# result = execute_asset_job_in_process(job_def) -# assert result.success - -# assert len(result.get_asset_materialization_events()) == 2 -# check_evals = result.get_asset_check_evaluations() -# assert {check_eval.handle for check_eval in check_evals} == { -# AssetCheckHandle(asset1.key, "asset1_check1"), -# AssetCheckHandle(asset1.key, "asset1_check2"), -# } +def test_job_with_asset_without_its_checks(): + job_def = define_asset_job( + "job1", selection=AssetSelection.assets(asset1) - AssetSelection.all_asset_checks() + ) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 1 + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 0 + + +def test_job_with_all_assets_and_all_checks(): + job_def = define_asset_job("job1", selection=AssetSelection.all()) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 2 + check_evals = result.get_asset_check_evaluations() + assert len(check_evals) == 3 + + +def test_job_with_all_assets_and_all_but_one_check(): + job_def = define_asset_job( + "job1", selection=AssetSelection.all() - AssetSelection.asset_checks(asset1_check1) + ) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 2 + check_evals = result.get_asset_check_evaluations() + assert {check_eval.asset_check_handle for check_eval in check_evals} == { + AssetCheckHandle(asset1.key, "asset1_check2"), + AssetCheckHandle(asset2.key, "asset2_check1"), + } + + +def test_include_asset_after_excluding_checks(): + job_def = define_asset_job( + "job1", + selection=(AssetSelection.all() - AssetSelection.all_asset_checks()) + | AssetSelection.assets(asset1), + ) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 2 + check_evals = result.get_asset_check_evaluations() + assert {check_eval.asset_check_handle for check_eval in check_evals} == { + AssetCheckHandle(asset1.key, "asset1_check1"), + AssetCheckHandle(asset1.key, "asset1_check2"), + } + + # AssetSelection.all_assets() is equivalent to AssetSelection.all() - AssetSelection.all_asset_checks() + job_def = define_asset_job( + "job1", + selection=(AssetSelection.all_assets()) + | AssetSelection.assets(asset1), + ) + result = execute_asset_job_in_process(job_def) + assert result.success + + assert len(result.get_asset_materialization_events()) == 2 + check_evals = result.get_asset_check_evaluations() + assert {check_eval.asset_check_handle for check_eval in check_evals} == { + AssetCheckHandle(asset1.key, "asset1_check1"), + AssetCheckHandle(asset1.key, "asset1_check2"), + }