From fa30e7a8d4b8cd0d70049cbd8b50f0319b651155 Mon Sep 17 00:00:00 2001 From: Ben Pankow Date: Tue, 8 Oct 2024 13:30:21 -0700 Subject: [PATCH] Add AssetSelection.materializable selection type (#25076) ## Summary Adds a way to drop all non-materializable assets from a selection, in case a user has downstream non-materializable, external assets within a selection they'd like to ignore ## Test Plan Unit test ## Changelog - Added AssetSelection.materializable(), which returns only assets that are materializable in an existing selection --- .../_core/definitions/asset_selection.py | 21 ++++++++- .../asset_defs_tests/test_asset_selection.py | 44 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 25de912ee5d01..569448b6b06d0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -17,7 +17,7 @@ key_prefix_from_coercible, ) from dagster._core.definitions.assets import AssetsDefinition -from dagster._core.definitions.base_asset_graph import BaseAssetGraph +from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode from dagster._core.definitions.resolved_asset_deps import resolve_similar_asset_names from dagster._core.definitions.source_asset import SourceAsset from dagster._core.errors import DagsterInvalidSubsetError @@ -356,6 +356,13 @@ def roots(self) -> "RootsAssetSelection": """ return RootsAssetSelection(child=self) + @public + def materializable(self) -> "MaterializableAssetSelection": + """Given an asset selection, returns a new asset selection that contains all of the assets + that are materializable. Removes any assets which are not materializable. + """ + return MaterializableAssetSelection(child=self) + @public @deprecated(breaking_version="2.0", additional_warn_text="Use AssetSelection.roots instead.") def sources(self) -> "RootsAssetSelection": @@ -781,6 +788,18 @@ def resolve_inner( return fetch_sources(asset_graph.asset_dep_graph, selection) +@whitelist_for_serdes +class MaterializableAssetSelection(ChainedAssetSelection): + def resolve_inner( + self, asset_graph: BaseAssetGraph, allow_missing: bool + ) -> AbstractSet[AssetKey]: + return { + asset_key + for asset_key in self.child.resolve_inner(asset_graph, allow_missing=allow_missing) + if cast(BaseAssetNode, asset_graph.get(asset_key)).is_materializable + } + + @whitelist_for_serdes class DownstreamAssetSelection(ChainedAssetSelection): depth: Optional[int] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_selection.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_selection.py index 2898d5f2da39e..d6c51e23faf00 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_selection.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_selection.py @@ -18,6 +18,7 @@ TimeWindowPartitionMapping, asset_check, multi_asset, + observable_source_asset, ) from dagster._core.definitions import AssetSelection, asset from dagster._core.definitions.asset_check_spec import AssetCheckKey @@ -327,6 +328,49 @@ def c(b): assert AssetSelection.assets("c").roots().resolve([a, b, c]) == {c.key} +def test_materializable() -> None: + source_upstream = SourceAsset("source_upstream") + + @observable_source_asset + def obs_source_upstream() -> None: + pass + + @asset + def b(source_upstream, obs_source_upstream): + pass + + @asset + def c(b): + pass + + assert AssetSelection.assets("source_upstream", "obs_source_upstream", "b", "c").resolve( + [ + source_upstream, + obs_source_upstream, + b, + c, + ] + ) == { + source_upstream.key, + obs_source_upstream.key, + b.key, + c.key, + } + assert AssetSelection.assets( + "source_upstream", "obs_source_upstream", "b", "c" + ).materializable().resolve( + [ + source_upstream, + obs_source_upstream, + b, + c, + ] + ) == { + b.key, + c.key, + } + + def test_sources(): @asset def a():