Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Data freshness guide #23937

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,49 @@
---
title: "Test for data freshness"
title: "Check for data freshness"
sidebar_position: 20
---
---
Freshness checks provide a way to identify data assets that are overdue for an update.

This guide covers how to construct freshness checks for materializable [assets](/todo) and [external assets](/todo).

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

- Familiarity with [assets](/todo)
- Familiarity with [asset checks](/todo)

</details>

## Check data freshness for materializable assets

The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time.

Defining a schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails.

<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/materializable-asset-freshness-check.py" language="python" title="Check data freshness for materializable assets" />

## Check data freshness for external assets

To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps as values for the [`dagster/last_updated_timestamp`](/todo) observation metadata key allows Dagster to calculate whether the asset is overdue.

The example below defines a freshness check and adds a schedule to run the check periodically.

<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/external-asset-freshness-check.py" language="python" title="Check data freshness for external assets" />

### Use anomaly detection to test data freshness (Dagster+ Pro)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from the OG guide:

This model uses data from past materializations/observations to determine if data is arriving later than expected. Note: If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.

I think this is important to note (wordsmithing encouraged), otherwise users may get confused when they see this message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, added

Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can take advantage of a time series anomaly detection model to determine if data is arriving later than expected.

<CodeExample filePath="guides/data-assets/quality-testing/freshness-checks/anomaly-detection.py" language="python" title="Use anomaly detection to detect overdue assets" />

:::note
If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.
:::

erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved
## Next steps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +44]

I didn’t even know about this D+ feature, could you add a next step that links to a /todo page for a Dagster+ anomaly detection page?

See this comment inline on Graphite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

- Explore more [asset checks](/todo)
- Explore how to [raise alerts when assets are overdue](/todo) (Dagster+ Pro)
- Explore more about [anomaly detection](/todo) (Dagster+ Pro)
1 change: 1 addition & 0 deletions docs/vale/styles/config/vocabularies/Dagster/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Declarative Automation
dagster-.*
gRPC
[Mm]aterializations
[Mm]aterializable
[Mm]emoization
REST API
[Ss]ubprocess
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks

import dagster as dg


@dg.observable_source_asset
def hourly_sales(): ...


freshness_checks = build_anomaly_detection_freshness_checks(
assets=[hourly_sales], params=None
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from datetime import timedelta

import dagster_snowflake as dg_snowflake

import dagster as dg


@dg.observable_source_asset
def hourly_sales(snowflake: dg_snowflake.SnowflakeResource):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name this function something like check_hourly_sales to avoid confusing it with the asset spec?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually expect both names to be the same, since the asset spec is just defining an asset key...

But that actually reminds me that defining an asset spec here is redundant, so I'm removing that

table_name = "hourly_sales"
with snowflake.get_connection() as conn:
freshness_results = dg_snowflake.fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=[table_name],
schema="PUBLIC",
)
return dg.ObserveResult(
asset_key=table_name,
metadata={
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp(
freshness_results[table_name]
)
},
)


freshness_check_schedule = dg.ScheduleDefinition(
job=dg.define_asset_job(
"hourly_sales_observation_job",
selection=dg.AssetSelection.keys("hourly_sales"),
),
# Runs every minute. Usually, a much less frequent cadence is necessary,
# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",
)


hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales],
lower_bound_delta=timedelta(hours=1),
)


defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
schedules=[freshness_check_schedule],
resources={
"snowflake": dg_snowflake.SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from datetime import timedelta

import dagster as dg


@dg.asset
def hourly_sales(context: dg.AssetExecutionContext):
context.log.info("Fetching and emitting hourly sales data")
...


hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales], lower_bound_delta=timedelta(hours=1)
)
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
freshness_checks=hourly_sales_freshness_check
)
defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
sensors=[freshness_checks_sensor],
)
2 changes: 1 addition & 1 deletion examples/docs_beta_snippets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
],
packages=find_packages(exclude=["test"]),
install_requires=["dagster-cloud", "dagster-aws"],
extras_require={"test": ["pytest", "mock", "path"]},
extras_require={"test": ["pytest", "mock", "path", "dagster_snowflake"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from airflow.operators.bash import BashOperator
from dagster_airlift.in_airflow import mark_as_dagster_migrating
from dagster_airlift.migration_state import load_migration_state_from_yaml

from dbt_example.shared.lakehouse_utils import load_csv_to_duckdb
from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS

Expand Down
Loading