diff --git a/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md b/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md index a391dfc0baf26..377f1b50faaf4 100644 --- a/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md +++ b/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md @@ -1,5 +1,5 @@ --- -title: "Test for data freshness" +title: "Check for data freshness" sidebar_position: 20 --- Freshness checks provide a way to identify data assets that are overdue for an update. @@ -16,28 +16,30 @@ To follow the steps in this guide, you'll need: -## Test data freshness for materializable assets +## Check data freshness for materializable assets The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time. Defining a schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails. - + -## Test data freshness for external assets +## Check data freshness for external assets -To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps in observation metadata allows Dagster to calculate whether the asset is overdue. +To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps as values for the [`dagster/last_updated_timestamp`](/todo) observation metadata key allows Dagster to calculate whether the asset is overdue. The example below defines a freshness check and adds a schedule to run the check periodically. - + ### Use anomaly detection to test data freshness (Dagster+ Pro) -Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can take advantage of a time series anomaly detection model to determine if data is arriving later than expected. +Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can take advantage of a time series anomaly detection model to determine if data is arriving later than expected. Note: If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies. ## Next steps - Explore more [asset checks](/todo) +- Explore how to [raise alerts when assets are overdue](/todo) (Dagster+ Pro) +- Explore more about [anomaly detection](/todo) (Dagster+ Pro) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py index fd95e7ce75f16..fc68610252236 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py @@ -1,7 +1,3 @@ from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks -hourly_sales = ... - -freshness_checks = build_anomaly_detection_freshness_checks( - assets=[hourly_sales], params=None -) +freshness_checks = build_anomaly_detection_freshness_checks(assets=..., params=None) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py index cfe78fe3a55e4..95ce0612143dd 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py @@ -5,29 +5,29 @@ import dagster as dg -@dg.observable_source_asset(specs=[dg.AssetSpec("hourly_sales")]) +@dg.observable_source_asset def hourly_sales(snowflake: dg_snowflake.SnowflakeResource): + table_name = "hourly_sales" with snowflake.get_connection() as conn: freshness_results = dg_snowflake.fetch_last_updated_timestamps( snowflake_connection=conn.cursor(), - tables=["hourly_sales"], + tables=[table_name], schema="PUBLIC", ) - for table_name, last_updated in freshness_results.items(): - yield dg.ObserveResult( - asset_key=table_name, - metadata={ - "dagster/last_updated_timestamp": dg.MetadataValue.timestamp( - last_updated - ) - }, - ) + return dg.ObserveResult( + asset_key=table_name, + metadata={ + "dagster/last_updated_timestamp": dg.MetadataValue.timestamp( + freshness_results[table_name] + ) + }, + ) freshness_check_schedule = dg.ScheduleDefinition( job=dg.define_asset_job( "hourly_sales_observation_job", - selection=dg.AssetSelection.assets(hourly_sales), + selection=dg.AssetSelection.keys("hourly_sales"), ), # Runs every minute. Usually, a much less frequent cadence is necessary, # but a short cadence makes it easier to play around with this example.