-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8990e57
commit b08df08
Showing
5 changed files
with
125 additions
and
1 deletion.
There are no files selected for viewing
41 changes: 40 additions & 1 deletion
41
docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,43 @@ | ||
--- | ||
title: "Test for data freshness" | ||
sidebar_position: 20 | ||
--- | ||
--- | ||
Freshness checks provide a way to identify data assets that are overdue for an update. | ||
|
||
This guide covers how to construct freshness checks for materializable [assets](/todo) and [external assets](/todo). | ||
Check failure on line 7 in docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md GitHub Actions / runner / vale
Check failure on line 7 in docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md GitHub Actions / runner / vale
|
||
|
||
<details> | ||
<summary>Prerequisites</summary> | ||
|
||
To follow the steps in this guide, you'll need: | ||
|
||
- Familiarity with [assets](/todo) | ||
- Familiarity with [asset checks](/todo) | ||
|
||
</details> | ||
|
||
## Test data freshness for materializable assets | ||
Check failure on line 19 in docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md GitHub Actions / runner / vale
Check failure on line 19 in docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md GitHub Actions / runner / vale
|
||
|
||
The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time. | ||
|
||
Defining a schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails. | ||
|
||
<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py" language="python" title="Test data freshness for materializable assets" /> | ||
|
||
## Test data freshness for external assets | ||
|
||
To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps in observation metadata allows Dagster to calculate whether the asset is overdue. | ||
|
||
The example below defines a freshness check and adds a schedule to run the check periodically. | ||
|
||
<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py" language="python" title="Test data freshness for external assets" /> | ||
|
||
### Use anomaly detection to test data freshness (Dagster+ Pro) | ||
|
||
Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can take advantage of a time series anomaly detection model to determine if data is arriving later than expected. | ||
|
||
<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py" language="python" title="Use anomaly detection to detect overdue assets" /> | ||
|
||
## Next steps | ||
|
||
- Explore more [asset checks](/todo) |
7 changes: 7 additions & 0 deletions
7
...cs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks | ||
|
||
hourly_sales = ... | ||
|
||
freshness_checks = build_anomaly_detection_freshness_checks( | ||
assets=[hourly_sales], params=None | ||
) |
55 changes: 55 additions & 0 deletions
55
...ets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from datetime import timedelta | ||
|
||
import dagster_snowflake as dg_snowflake | ||
|
||
import dagster as dg | ||
|
||
|
||
@dg.observable_source_asset(specs=[dg.AssetSpec("hourly_sales")]) | ||
def hourly_sales(snowflake: dg_snowflake.SnowflakeResource): | ||
with snowflake.get_connection() as conn: | ||
freshness_results = dg_snowflake.fetch_last_updated_timestamps( | ||
snowflake_connection=conn.cursor(), | ||
tables=["hourly_sales"], | ||
schema="PUBLIC", | ||
) | ||
for table_name, last_updated in freshness_results.items(): | ||
yield dg.ObserveResult( | ||
asset_key=table_name, | ||
metadata={ | ||
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp( | ||
last_updated | ||
) | ||
}, | ||
) | ||
|
||
|
||
freshness_check_schedule = dg.ScheduleDefinition( | ||
job=dg.define_asset_job( | ||
"hourly_sales_observation_job", | ||
selection=dg.AssetSelection.assets(hourly_sales), | ||
), | ||
# Runs every minute. Usually, a much less frequent cadence is necessary, | ||
# but a short cadence makes it easier to play around with this example. | ||
cron_schedule="* * * * *", | ||
) | ||
|
||
|
||
hourly_sales_freshness_check = dg.build_last_update_freshness_checks( | ||
assets=[hourly_sales], | ||
lower_bound_delta=timedelta(hours=1), | ||
) | ||
|
||
|
||
defs = dg.Definitions( | ||
assets=[hourly_sales], | ||
asset_checks=hourly_sales_freshness_check, | ||
schedules=[freshness_check_schedule], | ||
resources={ | ||
"snowflake": dg_snowflake.SnowflakeResource( | ||
user=dg.EnvVar("SNOWFLAKE_USER"), | ||
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"), | ||
password=dg.EnvVar("SNOWFLAKE_PASSWORD"), | ||
) | ||
}, | ||
) |
22 changes: 22 additions & 0 deletions
22
...ides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from datetime import timedelta | ||
|
||
import dagster as dg | ||
|
||
|
||
@dg.asset | ||
def hourly_sales(context: dg.AssetExecutionContext): | ||
context.log.info("Fetching and emitting hourly sales data") | ||
... | ||
|
||
|
||
hourly_sales_freshness_check = dg.build_last_update_freshness_checks( | ||
assets=[hourly_sales], lower_bound_delta=timedelta(hours=1) | ||
) | ||
freshness_checks_sensor = dg.build_sensor_for_freshness_checks( | ||
freshness_checks=hourly_sales_freshness_check | ||
) | ||
defs = dg.Definitions( | ||
assets=[hourly_sales], | ||
asset_checks=hourly_sales_freshness_check, | ||
sensors=[freshness_checks_sensor], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters