Skip to content

Commit

Permalink
default to include
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 19, 2023
1 parent cc70d15 commit db9ad33
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,6 @@ def build_asset_selection_job(
build_source_asset_observation_job,
)



if asset_selection is None and asset_check_selection is None:
# no selections, include everything
included_assets = list(assets)
Expand Down
98 changes: 69 additions & 29 deletions python_modules/dagster/dagster/_core/definitions/asset_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import operator
from abc import ABC, abstractmethod
from functools import reduce
from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast, Tuple
from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import deprecated, public
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.errors import DagsterInvalidSubsetError
from dagster._core.selector.subset_selector import (
fetch_connected,
Expand All @@ -16,6 +17,7 @@
parse_clause,
)

from .asset_check_spec import AssetCheckHandle
from .asset_graph import AssetGraph
from .assets import AssetsDefinition
from .events import (
Expand All @@ -24,7 +26,6 @@
CoercibleToAssetKeyPrefix,
key_prefix_from_coercible,
)
from .asset_check_spec import AssetCheckHandle
from .source_asset import SourceAsset

CoercibleToAssetSelection: TypeAlias = Union[
Expand Down Expand Up @@ -69,9 +70,21 @@ class AssetSelection(ABC):
@public
@staticmethod
def all() -> "AllAssetSelection":
"""Returns a selection that includes all assets and asset checks."""
return AllSelection()

@public
@staticmethod
def all_assets() -> "AllAssetSelection":
"""Returns a selection that includes all assets."""
return AllAssetSelection()

@public
@staticmethod
def all_asset_checks() -> "AllAssetSelection":
"""Returns a selection that includes all asset checks."""
return AllAssetCheckSelection()

@public
@staticmethod
def assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
Expand Down Expand Up @@ -236,25 +249,24 @@ def upstream_source_assets(self) -> "SourceAssetSelection":
"""
return SourceAssetSelection(self)

@public
@staticmethod
def all_asset_checks() -> "AllAssetSelection":
"""Returns a selection that includes all asset checks."""
return AllAssetCheckSelection()

@public
@staticmethod
def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets."""
return AssetChecksForAssetKeys(*(key for assets_def in assets_defs for key in assets_def.keys))

return AssetChecksForAssetKeys(
[key for assets_def in assets_defs for key in assets_def.keys]
)

@public
@staticmethod
def asset_checks(*asset_checks: Tuple[CoercibleToAssetKey, str]) -> "KeysAssetSelection":
def asset_checks(*asset_checks: AssetChecksDefinition) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets."""
return AssetChecksForHandles(*(AssetCheckHandle(asset_key=AssetKey.from_coercible(key), check_name=check_name) for key, check_name in asset_checks))

return AssetChecksForHandles(
[
AssetCheckHandle(asset_key=AssetKey.from_coercible(spec.asset_key), name=spec.name)
for checks_def in asset_checks for spec in checks_def.specs
]
)

def __or__(self, other: "AssetSelection") -> "OrAssetSelection":
check.inst_param(other, "other", AssetSelection)
Expand Down Expand Up @@ -287,21 +299,20 @@ def resolve(
)
return resolved

def resolve_checks(
self, asset_graph: AssetGraph
) -> AbstractSet[AssetCheckHandle]:
return self.resolve_checks_inner(asset_graph)



@abstractmethod
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
raise NotImplementedError()


@abstractmethod
def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
raise NotImplementedError
def resolve_checks(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
"""We don't need this method currently, but it makes things consistent with resolve_inner."""
return self.resolve_checks_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
"""By default, resolve to checks that target the selected assets. This is overriden for particular selections."""
asset_keys = self.resolve(asset_graph)
return {
handle for handle in asset_graph.asset_check_handles if handle.asset_key in asset_keys
}

@staticmethod
def _selection_from_string(string: str) -> "AssetSelection":
Expand Down Expand Up @@ -358,13 +369,19 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection
)


class AllSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return asset_graph.materializable_asset_keys


class AllAssetSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return asset_graph.materializable_asset_keys

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return set()


class AllAssetCheckSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()
Expand All @@ -374,14 +391,17 @@ def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetChec


class AssetChecksForAssetKeys(AssetSelection):
def __init__(self, keys: AssetKey):
def __init__(self, keys: Sequence[AssetKey]):
self._keys = keys

def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return {handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys}
return {
handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys
}


class AssetChecksForHandles(AssetSelection):
def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]):
Expand All @@ -391,8 +411,13 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return {handle for handle in asset_graph.asset_check_handles if handle in self._asset_check_handles}

return {
handle
for handle in asset_graph.asset_check_handles
if handle in self._asset_check_handles
}


class AndAssetSelection(AssetSelection):
def __init__(self, left: AssetSelection, right: AssetSelection):
self._left = left
Expand All @@ -401,6 +426,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) & self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) & self._right.resolve_checks_inner(
asset_graph
)


class SubAssetSelection(AssetSelection):
def __init__(self, left: AssetSelection, right: AssetSelection):
Expand All @@ -410,6 +440,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) - self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) - self._right.resolve_checks_inner(
asset_graph
)


class SinkAssetSelection(AssetSelection):
def __init__(self, child: AssetSelection):
Expand Down Expand Up @@ -531,6 +566,11 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) | self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) | self._right.resolve_checks_inner(
asset_graph
)


def _fetch_all_upstream(
selection: AbstractSet[AssetKey],
Expand Down
Loading

0 comments on commit db9ad33

Please sign in to comment.