Skip to content

Commit

Permalink
Add AssetSelection.materializable selection type (#25076)
Browse files Browse the repository at this point in the history
## Summary

Adds a way to drop all non-materializable assets from a selection, in
case a user has downstream non-materializable, external assets within a
selection they'd like to ignore

## Test Plan

Unit test

## Changelog

- Added AssetSelection.materializable(), which returns only assets that
are materializable in an existing selection
  • Loading branch information
benpankow authored Oct 8, 2024
1 parent 71b739d commit fa30e7a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
key_prefix_from_coercible,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.resolved_asset_deps import resolve_similar_asset_names
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvalidSubsetError
Expand Down Expand Up @@ -356,6 +356,13 @@ def roots(self) -> "RootsAssetSelection":
"""
return RootsAssetSelection(child=self)

@public
def materializable(self) -> "MaterializableAssetSelection":
"""Given an asset selection, returns a new asset selection that contains all of the assets
that are materializable. Removes any assets which are not materializable.
"""
return MaterializableAssetSelection(child=self)

@public
@deprecated(breaking_version="2.0", additional_warn_text="Use AssetSelection.roots instead.")
def sources(self) -> "RootsAssetSelection":
Expand Down Expand Up @@ -781,6 +788,18 @@ def resolve_inner(
return fetch_sources(asset_graph.asset_dep_graph, selection)


@whitelist_for_serdes
class MaterializableAssetSelection(ChainedAssetSelection):
def resolve_inner(
self, asset_graph: BaseAssetGraph, allow_missing: bool
) -> AbstractSet[AssetKey]:
return {
asset_key
for asset_key in self.child.resolve_inner(asset_graph, allow_missing=allow_missing)
if cast(BaseAssetNode, asset_graph.get(asset_key)).is_materializable
}


@whitelist_for_serdes
class DownstreamAssetSelection(ChainedAssetSelection):
depth: Optional[int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TimeWindowPartitionMapping,
asset_check,
multi_asset,
observable_source_asset,
)
from dagster._core.definitions import AssetSelection, asset
from dagster._core.definitions.asset_check_spec import AssetCheckKey
Expand Down Expand Up @@ -327,6 +328,49 @@ def c(b):
assert AssetSelection.assets("c").roots().resolve([a, b, c]) == {c.key}


def test_materializable() -> None:
source_upstream = SourceAsset("source_upstream")

@observable_source_asset
def obs_source_upstream() -> None:
pass

@asset
def b(source_upstream, obs_source_upstream):
pass

@asset
def c(b):
pass

assert AssetSelection.assets("source_upstream", "obs_source_upstream", "b", "c").resolve(
[
source_upstream,
obs_source_upstream,
b,
c,
]
) == {
source_upstream.key,
obs_source_upstream.key,
b.key,
c.key,
}
assert AssetSelection.assets(
"source_upstream", "obs_source_upstream", "b", "c"
).materializable().resolve(
[
source_upstream,
obs_source_upstream,
b,
c,
]
) == {
b.key,
c.key,
}


def test_sources():
@asset
def a():
Expand Down

0 comments on commit fa30e7a

Please sign in to comment.