Skip to content

Commit

Permalink
types
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 19, 2023
1 parent 4cd8a4c commit f0dc53c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
33 changes: 21 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/asset_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from .asset_check_spec import AssetCheckHandle
from .asset_graph import AssetGraph
from .asset_graph import AssetGraph, InternalAssetGraph
from .assets import AssetsDefinition
from .events import (
AssetKey,
Expand Down Expand Up @@ -256,7 +256,7 @@ def upstream_source_assets(self) -> "SourceAssetSelection":

@public
@staticmethod
def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "AssetChecksForAssetKeys":
"""Returns a selection with the asset checks that target the provided assets."""
return AssetChecksForAssetKeys(
[key for assets_def in assets_defs for key in assets_def.keys]
Expand Down Expand Up @@ -309,11 +309,15 @@ def resolve(
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
raise NotImplementedError()

def resolve_checks(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
"""We don't need this method currently, but it makes things consistent with resolve_inner."""
def resolve_checks(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]:
"""We don't need this method currently, but it makes things consistent with resolve_inner. Currently
we don't store checks in the ExternalAssetGraph, so we only support InternalAssetGraph.
"""
return self.resolve_checks_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(
self, asset_graph: InternalAssetGraph
) -> AbstractSet[AssetCheckHandle]:
"""By default, resolve to checks that target the selected assets. This is overriden for particular selections."""
asset_keys = self.resolve(asset_graph)
return {
Expand Down Expand Up @@ -380,12 +384,13 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return asset_graph.materializable_asset_keys



class AllAssetCheckSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(
self, asset_graph: InternalAssetGraph
) -> AbstractSet[AssetCheckHandle]:
return asset_graph.asset_check_handles


Expand All @@ -396,7 +401,9 @@ def __init__(self, keys: Sequence[AssetKey]):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(
self, asset_graph: InternalAssetGraph
) -> AbstractSet[AssetCheckHandle]:
return {
handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys
}
Expand All @@ -409,7 +416,9 @@ def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(
self, asset_graph: InternalAssetGraph
) -> AbstractSet[AssetCheckHandle]:
return {
handle
for handle in asset_graph.asset_check_handles
Expand All @@ -425,7 +434,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) & self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) & self._right.resolve_checks_inner(
asset_graph
)
Expand All @@ -439,7 +448,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) - self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) - self._right.resolve_checks_inner(
asset_graph
)
Expand Down Expand Up @@ -565,7 +574,7 @@ def __init__(self, left: AssetSelection, right: AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return self._left.resolve_inner(asset_graph) | self._right.resolve_inner(asset_graph)

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
def resolve_checks_inner(self, asset_graph: InternalAssetGraph) -> AbstractSet[AssetCheckHandle]:
return self._left.resolve_checks_inner(asset_graph) | self._right.resolve_checks_inner(
asset_graph
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.host_representation.external import ExternalRepository
Expand Down Expand Up @@ -300,3 +301,7 @@ def split_asset_keys_by_repository(
asset_key
)
return list(asset_keys_by_repo.values())

@property
def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]:
raise NotImplementedError("ExternalAssetGraph does not support asset checks")

0 comments on commit f0dc53c

Please sign in to comment.