Skip to content

Commit

Permalink
asset check factory docs
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Oct 31, 2023
1 parent bf62107 commit 2195c3a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
44 changes: 44 additions & 0 deletions docs/content/concepts/assets/asset-checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,50 @@ defs = Definitions(

There are a variety of types supported via the <PyObject object="MetadataValue" /> class. You can view the metadata on the **Checks** tab of the **Asset details** page.

### Asset check factories

If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.

```python file=/concepts/assets/asset_checks/factory.py
from typing import Any, List, Mapping

from dagster import AssetCheckResult, Definitions, asset_check


def make_checks(check_blobs: List[Mapping[str, Any]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
...,
]

defs = Definitions(asset_checks=make_checks(check_blobs))
```

---

## Executing checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Any, List, Mapping

from dagster import AssetCheckResult, Definitions, asset_check


def make_checks(check_blobs: List[Mapping[str, Any]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
...,
]

defs = Definitions(asset_checks=make_checks(check_blobs))

0 comments on commit 2195c3a

Please sign in to comment.