diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 71209031ee0d1..264a2ec3883bd 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -18,7 +18,7 @@ ) from .asset_check_spec import AssetCheckHandle -from .asset_graph import AssetGraph +from .asset_graph import AssetGraph, InternalAssetGraph from .assets import AssetsDefinition from .events import ( AssetKey, @@ -256,7 +256,7 @@ def upstream_source_assets(self) -> "SourceAssetSelection": @public @staticmethod - def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection": + def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "AssetChecksForAssetKeys": """Returns a selection with the asset checks that target the provided assets.""" return AssetChecksForAssetKeys( [key for assets_def in assets_defs for key in assets_def.keys] @@ -309,11 +309,15 @@ def resolve( def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: raise NotImplementedError() - def resolve_checks(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: - """We don't need this method currently, but it makes things consistent with resolve_inner.""" + def resolve_checks(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]: + """We don't need this method currently, but it makes things consistent with resolve_inner. Currently + we don't store checks in the ExternalAssetGraph, so we only support InternalAssetGraph. + """ return self.resolve_checks_inner(asset_graph) - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner( + self, asset_graph: InternalAssetGraph + ) -> AbstractSet[AssetCheckHandle]: """By default, resolve to checks that target the selected assets. This is overriden for particular selections.""" asset_keys = self.resolve(asset_graph) return { @@ -380,12 +384,13 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return asset_graph.materializable_asset_keys - class AllAssetCheckSelection(AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner( + self, asset_graph: InternalAssetGraph + ) -> AbstractSet[AssetCheckHandle]: return asset_graph.asset_check_handles @@ -396,7 +401,9 @@ def __init__(self, keys: Sequence[AssetKey]): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner( + self, asset_graph: InternalAssetGraph + ) -> AbstractSet[AssetCheckHandle]: return { handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys } @@ -409,7 +416,9 @@ def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return set() - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner( + self, asset_graph: InternalAssetGraph + ) -> AbstractSet[AssetCheckHandle]: return { handle for handle in asset_graph.asset_check_handles @@ -425,7 +434,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) & self._right.resolve_inner(asset_graph) - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]: return self._left.resolve_checks_inner(asset_graph) & self._right.resolve_checks_inner( asset_graph ) @@ -439,7 +448,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) - self._right.resolve_inner(asset_graph) - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]: return self._left.resolve_checks_inner(asset_graph) - self._right.resolve_checks_inner( asset_graph ) @@ -565,7 +574,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection): def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: return self._left.resolve_inner(asset_graph) | self._right.resolve_inner(asset_graph) - def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]: + def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]: return self._left.resolve_checks_inner(asset_graph) | self._right.resolve_checks_inner( asset_graph ) diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index e50ca007e508b..b112b466158db 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -13,6 +13,7 @@ ) import dagster._check as check +from dagster._core.definitions.asset_check_spec import AssetCheckHandle from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository @@ -300,3 +301,7 @@ def split_asset_keys_by_repository( asset_key ) return list(asset_keys_by_repo.values()) + + @property + def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]: + raise NotImplementedError("ExternalAssetGraph does not support asset checks")