Skip to content

Commit

Permalink
add check spec test
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 20, 2023
1 parent b00675a commit 0f22746
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
AssetSelection,
Definitions,
ExecuteInProcessResult,
Output,
asset,
asset_check,
define_asset_job,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_check_spec import AssetCheckHandle, AssetCheckSpec
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition


Expand Down Expand Up @@ -158,3 +159,32 @@ def test_include_asset_after_excluding_checks():
AssetCheckHandle(asset1.key, "asset1_check1"),
AssetCheckHandle(asset1.key, "asset1_check2"),
}


@asset(check_specs=[AssetCheckSpec(asset="asset_with_checks", name="check1"), AssetCheckSpec(asset="asset_with_checks", name="check2")])
def asset_with_checks():
yield Output(1)
yield AssetCheckResult(success=True, check_name="check1")
yield AssetCheckResult(success=True, check_name="check2")


def execute_asset_job_2_in_process(asset_job: UnresolvedAssetJobDefinition) -> ExecuteInProcessResult:
assets = [asset_with_checks]
defs = Definitions(assets=assets, jobs=[asset_job])
job_def = defs.get_job_def(asset_job.name)
return job_def.execute_in_process()


def test_checks_on_asset():
job_def = define_asset_job(
"job2", selection=AssetSelection.all()
)
result = execute_asset_job_2_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 1
check_evals = result.get_asset_check_evaluations()
assert {check_eval.asset_check_handle for check_eval in check_evals} == {
AssetCheckHandle(asset_with_checks.key, "check1"),
AssetCheckHandle(asset_with_checks.key, "check2"),
}

0 comments on commit 0f22746

Please sign in to comment.