Skip to content

Commit

Permalink
checks on AssetSelection
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 19, 2023
1 parent 373510e commit cc70d15
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 4 deletions.
12 changes: 12 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from dagster._utils.cached_method import cached_method

from .asset_check_spec import AssetCheckHandle
from .asset_checks import AssetChecksDefinition
from .assets import AssetsDefinition
from .backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -706,6 +707,17 @@ def __init__(
self._source_assets = source_assets
self._asset_checks = asset_checks

asset_check_handles = set()
for asset_check in asset_checks:
asset_check_handles.update([spec.handle for spec in asset_check.specs])
for asset in assets:
asset_check_handles.update([spec.handle for spec in asset.check_specs])
self._asset_check_handles = asset_check_handles

@property
def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]:
return self._asset_check_handles

@property
def assets(self) -> Sequence[AssetsDefinition]:
return self._assets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ def build_asset_selection_job(
tags: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
asset_selection: Optional[AbstractSet[AssetKey]] = None,
asset_check_selection: Optional[AbstractSet[AssetCheckHandle]] = None,
asset_selection_data: Optional[AssetSelectionData] = None,
hooks: Optional[AbstractSet[HookDefinition]] = None,
) -> "JobDefinition":
Expand All @@ -793,9 +794,7 @@ def build_asset_selection_job(
build_source_asset_observation_job,
)

asset_check_selection = (
asset_selection_data.asset_check_selection if asset_selection_data else None
)


if asset_selection is None and asset_check_selection is None:
# no selections, include everything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import operator
from abc import ABC, abstractmethod
from functools import reduce
from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast
from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast, Tuple

from typing_extensions import TypeAlias

Expand All @@ -24,6 +24,7 @@
CoercibleToAssetKeyPrefix,
key_prefix_from_coercible,
)
from .asset_check_spec import AssetCheckHandle
from .source_asset import SourceAsset

CoercibleToAssetSelection: TypeAlias = Union[
Expand Down Expand Up @@ -235,6 +236,26 @@ def upstream_source_assets(self) -> "SourceAssetSelection":
"""
return SourceAssetSelection(self)

@public
@staticmethod
def all_asset_checks() -> "AllAssetSelection":
"""Returns a selection that includes all asset checks."""
return AllAssetCheckSelection()

@public
@staticmethod
def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets."""
return AssetChecksForAssetKeys(*(key for assets_def in assets_defs for key in assets_def.keys))


@public
@staticmethod
def asset_checks(*asset_checks: Tuple[CoercibleToAssetKey, str]) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets."""
return AssetChecksForHandles(*(AssetCheckHandle(asset_key=AssetKey.from_coercible(key), check_name=check_name) for key, check_name in asset_checks))


def __or__(self, other: "AssetSelection") -> "OrAssetSelection":
check.inst_param(other, "other", AssetSelection)
return OrAssetSelection(self, other)
Expand Down Expand Up @@ -266,10 +287,22 @@ def resolve(
)
return resolved

def resolve_checks(
self, asset_graph: AssetGraph
) -> AbstractSet[AssetCheckHandle]:
return self.resolve_checks_inner(asset_graph)



@abstractmethod
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
raise NotImplementedError()


@abstractmethod
def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
raise NotImplementedError

@staticmethod
def _selection_from_string(string: str) -> "AssetSelection":
from dagster._core.definitions import AssetSelection
Expand Down Expand Up @@ -328,8 +361,38 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection
class AllAssetSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return asset_graph.materializable_asset_keys

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return set()

class AllAssetCheckSelection(AssetSelection):
def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return asset_graph.asset_check_handles


class AssetChecksForAssetKeys(AssetSelection):
def __init__(self, keys: AssetKey):
self._keys = keys

def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return {handle for handle in asset_graph.asset_check_handles if handle.asset_key in self._keys}

class AssetChecksForHandles(AssetSelection):
def __init__(self, asset_check_handles: Sequence[AssetCheckHandle]):
self._asset_check_handles = asset_check_handles

def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
return set()

def resolve_checks_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetCheckHandle]:
return {handle for handle in asset_graph.asset_check_handles if handle in self._asset_check_handles}

class AndAssetSelection(AssetSelection):
def __init__(self, left: AssetSelection, right: AssetSelection):
self._left = left
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ def _get_job_def_for_asset_selection(
description=self.description,
tags=self.tags,
asset_selection=asset_selection,
asset_check_selection=asset_check_selection,
asset_selection_data=asset_selection_data,
config=self.config_mapping or self.partitioned_config,
asset_checks=self.asset_layer.asset_checks_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def resolve(
assets = asset_graph.assets
source_assets = asset_graph.source_assets
selected_asset_keys = self.selection.resolve(asset_graph)
selected_asset_checks = self.selection.resolve_checks(asset_graph)

asset_keys_by_partitions_def = defaultdict(set)
for asset_key in selected_asset_keys:
Expand Down Expand Up @@ -224,6 +225,7 @@ def resolve(
tags=self.tags,
metadata=self.metadata,
asset_selection=selected_asset_keys,
asset_check_selection=selected_asset_checks,
partitions_def=self.partitions_def if self.partitions_def else inferred_partitions_def,
executor_def=self.executor_def or default_executor_def,
hooks=self.hooks,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from dagster import (
AssetCheckResult,
AssetSelection,
Definitions,
ExecuteInProcessResult,
asset,
asset_check,
define_asset_job,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition


@asset
def asset1():
...


@asset
def asset2():
...


@asset_check(asset=asset1)
def asset1_check1():
return AssetCheckResult(success=True)


@asset_check(asset=asset1)
def asset1_check2():
return AssetCheckResult(success=True)


@asset_check(asset=asset2)
def asset2_check1():
return AssetCheckResult(success=True)


def execute_asset_job_in_process(asset_job: UnresolvedAssetJobDefinition) -> ExecuteInProcessResult:
assets = [asset1, asset2]
asset_checks = [asset1_check1, asset1_check2, asset2_check1]
defs = Definitions(assets=assets, jobs=[asset_job], asset_checks=asset_checks)
job_def = defs.get_job_def(asset_job.name)
return job_def.execute_in_process()


def test_job_with_all_checks_no_materializations():
job_def = define_asset_job("job1", selection=AssetSelection.all_asset_checks())
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 0
check_evals = result.get_asset_check_evaluations()
assert {check_eval.asset_check_handle for check_eval in check_evals} == {
AssetCheckHandle(asset1.key, "asset1_check1"),
AssetCheckHandle(asset1.key, "asset1_check2"),
AssetCheckHandle(asset2.key, "asset2_check1"),
}


def test_job_with_all_checks_for_asset():
job_def = define_asset_job("job1", selection=AssetSelection.asset_checks_for_assets(asset1))
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 0
check_evals = result.get_asset_check_evaluations()
assert {check_eval.handle for check_eval in check_evals} == {
AssetCheckHandle(asset1.key, "asset1_check1"),
AssetCheckHandle(asset1.key, "asset1_check2"),
}


def test_job_with_asset_and_all_its_checks():
job_def = define_asset_job("job1", selection=AssetSelection.assets(asset1) & AssetSelection.asset_checks_for_assets(asset1))
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 1
check_evals = result.get_asset_check_evaluations()
assert {check_eval.handle for check_eval in check_evals} == {
AssetCheckHandle(asset1.key, "asset1_check1"),
AssetCheckHandle(asset1.key, "asset1_check2"),
}


def test_job_with_single_check():
job_def = define_asset_job("job1", selection=AssetSelection.asset_checks(asset1_check1))
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 0
check_evals = result.get_asset_check_evaluations()
assert {check_eval.handle for check_eval in check_evals} == {
AssetCheckHandle(asset1.key, "asset1_check1"),
}


def test_job_with_all_assets_but_no_checks():
job_def = define_asset_job(
"job1", selection=AssetSelection.all_assets() - AssetSelection.all_asset_checks(asset1_check1)
)
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 2
check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 0


# def test_job_with_asset_without_its_checks():
# job_def = define_asset_job(
# "job1", selection=AssetSelection.assets(asset1) - AssetSelection.all_asset_checks()
# )
# result = execute_asset_job_in_process(job_def)
# assert result.success

# assert len(result.get_asset_materialization_events()) == 1
# check_evals = result.get_asset_check_evaluations()
# assert len(check_evals) == 0


# def test_job_with_all_assets_and_all_checks():
# job_def = define_asset_job("job1", selection=AssetSelection.all_assets())
# result = execute_asset_job_in_process(job_def)
# assert result.success

# assert len(result.get_asset_materialization_events()) == 2
# check_evals = result.get_asset_check_evaluations()
# assert len(check_evals) == 3


# def test_job_with_all_assets_and_all_but_one_check():
# job_def = define_asset_job(
# "job1", selection=AssetSelection.all_assets() - AssetSelection.asset_checks(asset1_check1)
# )
# result = execute_asset_job_in_process(job_def)
# assert result.success

# assert len(result.get_asset_materialization_events()) == 2
# check_evals = result.get_asset_check_evaluations()
# assert {check_eval.handle for check_eval in check_evals} == {
# AssetCheckHandle(asset1.key, "asset1_check2"),
# AssetCheckHandle(asset2.key, "asset2_check1"),
# }


# def test_include_asset_after_excluding_checks():
# job_def = define_asset_job(
# "job1",
# selection=(AssetSelection.all_assets() - AssetSelection.all_asset_checks())
# & AssetSelection.assets(asset1),
# )
# result = execute_asset_job_in_process(job_def)
# assert result.success

# assert len(result.get_asset_materialization_events()) == 2
# check_evals = result.get_asset_check_evaluations()
# assert {check_eval.handle for check_eval in check_evals} == {
# AssetCheckHandle(asset1.key, "asset1_check1"),
# AssetCheckHandle(asset1.key, "asset1_check2"),
# }

0 comments on commit cc70d15

Please sign in to comment.