Skip to content

Commit

Permalink
asset check factory docs
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Oct 31, 2023
1 parent bf62107 commit 3047099
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 1 deletion.
53 changes: 53 additions & 0 deletions docs/content/concepts/assets/asset-checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,59 @@ defs = Definitions(

There are a variety of types supported via the <PyObject object="MetadataValue" /> class. You can view the metadata on the **Checks** tab of the **Asset details** page.

### Asset check factories

If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.

```python file=/concepts/assets/asset_checks/factory.py
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
```

---

## Executing checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
import pytest

from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from docs_snippets.concepts.assets.asset_checks.asset_with_check import (
defs as asset_with_check_defs,
)
from docs_snippets.concepts.assets.asset_checks.factory import check_blobs, make_checks
from docs_snippets.concepts.assets.asset_checks.metadata import defs as metadata_defs
from docs_snippets.concepts.assets.asset_checks.orders_check import defs as orders_defs
from docs_snippets.concepts.assets.asset_checks.severity import defs as severity_defs


@pytest.mark.parametrize(
"defs", [orders_defs, asset_with_check_defs, severity_defs, metadata_defs]
"defs",
[orders_defs, asset_with_check_defs, severity_defs, metadata_defs],
)
def test_execute(defs):
job_def = defs.get_implicit_global_asset_job_def()
result = job_def.execute_in_process()
assert result.success


def test_factory():
assert [c.key for c in make_checks(check_blobs)] == [
AssetCheckKey(
AssetKey(["orders"]),
"orders_id_has_no_nulls",
),
AssetCheckKey(
AssetKey(["items"]),
"items_id_has_no_nulls",
),
]

0 comments on commit 3047099

Please sign in to comment.