diff --git a/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md b/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md index 83f664b46ec6d..dda493c7d79a3 100644 --- a/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md +++ b/docs/docs-beta/docs/guides/quality-testing/data-freshness-testing.md @@ -1,4 +1,49 @@ --- -title: "Test for data freshness" +title: "Check for data freshness" sidebar_position: 20 ---- \ No newline at end of file +--- +Freshness checks provide a way to identify data assets that are overdue for an update. + +This guide covers how to construct freshness checks for materializable [assets](/todo) and [external assets](/todo). + +
+ Prerequisites + +To follow the steps in this guide, you'll need: + +- Familiarity with [assets](/todo) +- Familiarity with [asset checks](/todo) + +
+ +## Check data freshness for materializable assets + +The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time. + +Defining a schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails. + + + +## Check data freshness for external assets + +To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps as values for the [`dagster/last_updated_timestamp`](/todo) observation metadata key allows Dagster to calculate whether the asset is overdue. + +The example below defines a freshness check and adds a schedule to run the check periodically. + + + +### Use anomaly detection to test data freshness (Dagster+ Pro) + +Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can take advantage of a time series anomaly detection model to determine if data is arriving later than expected. + + + +:::note +If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies. +::: + +## Next steps + +- Explore more [asset checks](/todo) +- Explore how to [raise alerts when assets are overdue](/todo) (Dagster+ Pro) +- Explore more about [anomaly detection](/todo) (Dagster+ Pro) diff --git a/docs/vale/styles/config/vocabularies/Dagster/accept.txt b/docs/vale/styles/config/vocabularies/Dagster/accept.txt index cd07400a1dd9e..b4408f9fadde6 100644 --- a/docs/vale/styles/config/vocabularies/Dagster/accept.txt +++ b/docs/vale/styles/config/vocabularies/Dagster/accept.txt @@ -6,6 +6,7 @@ Declarative Automation dagster-.* gRPC [Mm]aterializations +[Mm]aterializable [Mm]emoization REST API [Ss]ubprocess diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py new file mode 100644 index 0000000000000..32cc086dca836 --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py @@ -0,0 +1,12 @@ +from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks + +import dagster as dg + + +@dg.observable_source_asset +def hourly_sales(): ... + + +freshness_checks = build_anomaly_detection_freshness_checks( + assets=[hourly_sales], params=None +) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py new file mode 100644 index 0000000000000..95ce0612143dd --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py @@ -0,0 +1,55 @@ +from datetime import timedelta + +import dagster_snowflake as dg_snowflake + +import dagster as dg + + +@dg.observable_source_asset +def hourly_sales(snowflake: dg_snowflake.SnowflakeResource): + table_name = "hourly_sales" + with snowflake.get_connection() as conn: + freshness_results = dg_snowflake.fetch_last_updated_timestamps( + snowflake_connection=conn.cursor(), + tables=[table_name], + schema="PUBLIC", + ) + return dg.ObserveResult( + asset_key=table_name, + metadata={ + "dagster/last_updated_timestamp": dg.MetadataValue.timestamp( + freshness_results[table_name] + ) + }, + ) + + +freshness_check_schedule = dg.ScheduleDefinition( + job=dg.define_asset_job( + "hourly_sales_observation_job", + selection=dg.AssetSelection.keys("hourly_sales"), + ), + # Runs every minute. Usually, a much less frequent cadence is necessary, + # but a short cadence makes it easier to play around with this example. + cron_schedule="* * * * *", +) + + +hourly_sales_freshness_check = dg.build_last_update_freshness_checks( + assets=[hourly_sales], + lower_bound_delta=timedelta(hours=1), +) + + +defs = dg.Definitions( + assets=[hourly_sales], + asset_checks=hourly_sales_freshness_check, + schedules=[freshness_check_schedule], + resources={ + "snowflake": dg_snowflake.SnowflakeResource( + user=dg.EnvVar("SNOWFLAKE_USER"), + account=dg.EnvVar("SNOWFLAKE_ACCOUNT"), + password=dg.EnvVar("SNOWFLAKE_PASSWORD"), + ) + }, +) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py new file mode 100644 index 0000000000000..ade52d46aaaaf --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py @@ -0,0 +1,22 @@ +from datetime import timedelta + +import dagster as dg + + +@dg.asset +def hourly_sales(context: dg.AssetExecutionContext): + context.log.info("Fetching and emitting hourly sales data") + ... + + +hourly_sales_freshness_check = dg.build_last_update_freshness_checks( + assets=[hourly_sales], lower_bound_delta=timedelta(hours=1) +) +freshness_checks_sensor = dg.build_sensor_for_freshness_checks( + freshness_checks=hourly_sales_freshness_check +) +defs = dg.Definitions( + assets=[hourly_sales], + asset_checks=hourly_sales_freshness_check, + sensors=[freshness_checks_sensor], +) diff --git a/examples/docs_beta_snippets/setup.py b/examples/docs_beta_snippets/setup.py index d175f986e381b..f3ea0f032efb0 100755 --- a/examples/docs_beta_snippets/setup.py +++ b/examples/docs_beta_snippets/setup.py @@ -15,5 +15,5 @@ ], packages=find_packages(exclude=["test"]), install_requires=["dagster-cloud", "dagster-aws"], - extras_require={"test": ["pytest", "mock", "path"]}, + extras_require={"test": ["pytest", "mock", "path", "dagster_snowflake"]}, ) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py index 1f4454dded617..208c76bcbe2d8 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py @@ -8,6 +8,7 @@ from airflow.operators.bash import BashOperator from dagster_airlift.in_airflow import mark_as_dagster_migrating from dagster_airlift.migration_state import load_migration_state_from_yaml + from dbt_example.shared.lakehouse_utils import load_csv_to_duckdb from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS