Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asset check factory docs #17515

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/content/concepts/assets/asset-checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,59 @@ defs = Definitions(

There are a variety of types supported via the <PyObject object="MetadataValue" /> class. You can view the metadata on the **Checks** tab of the **Asset details** page.

### Asset check factories

If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.

```python file=/concepts/assets/asset_checks/factory.py
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
```

---

## Executing checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A return type would be helpful for understanding what this code is doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I hadn't added AssetChecksDefinition to the top level imports yet, will fix

checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
import pytest

from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from docs_snippets.concepts.assets.asset_checks.asset_with_check import (
defs as asset_with_check_defs,
)
from docs_snippets.concepts.assets.asset_checks.factory import check_blobs, make_checks
from docs_snippets.concepts.assets.asset_checks.metadata import defs as metadata_defs
from docs_snippets.concepts.assets.asset_checks.orders_check import defs as orders_defs
from docs_snippets.concepts.assets.asset_checks.severity import defs as severity_defs


@pytest.mark.parametrize(
"defs", [orders_defs, asset_with_check_defs, severity_defs, metadata_defs]
"defs",
[orders_defs, asset_with_check_defs, severity_defs, metadata_defs],
)
def test_execute(defs):
job_def = defs.get_implicit_global_asset_job_def()
result = job_def.execute_in_process()
assert result.success


def test_factory():
assert [c.spec.key for c in make_checks(check_blobs)] == [
AssetCheckKey(
AssetKey(["orders"]),
"orders_id_has_no_nulls",
),
AssetCheckKey(
AssetKey(["items"]),
"items_id_has_no_nulls",
),
]
Loading