Skip to content

Commit

Permalink
[dagster-airlift] add freshness check (#23875)
Browse files Browse the repository at this point in the history
## Summary & Motivation

## How I Tested These Changes

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
dpeng817 authored Aug 24, 2024
1 parent a7d42f6 commit 8fa1b6e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from typing import Sequence, Union

from dagster import AssetChecksDefinition, AssetsDefinition, AssetSpec, Definitions
from dagster import (
AssetChecksDefinition,
AssetsDefinition,
AssetSpec,
Definitions,
SensorDefinition,
)
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from typing_extensions import TypeAlias

Expand All @@ -24,22 +30,25 @@ def specs_from_task(


def combine_defs(
*defs: Union[AssetChecksDefinition, AssetsDefinition, Definitions, AssetSpec],
*defs: Union[AssetChecksDefinition, AssetsDefinition, Definitions, AssetSpec, SensorDefinition],
) -> Definitions:
"""Combine provided :py:class:`Definitions` objects and assets into a single object, which contains all constituent definitions."""
assets = []
asset_checks = []
sensor_defs = []
for _def in defs:
if isinstance(_def, Definitions):
continue
elif isinstance(_def, AssetChecksDefinition):
asset_checks.append(_def)
elif isinstance(_def, (AssetsDefinition, AssetSpec)):
assets.append(_def)
elif isinstance(_def, SensorDefinition):
sensor_defs.append(_def)
else:
raise Exception(f"Unexpected type: {type(_def)}")

return Definitions.merge(
*[the_def for the_def in defs if isinstance(the_def, Definitions)],
Definitions(assets=assets, asset_checks=asset_checks),
Definitions(assets=assets, asset_checks=asset_checks, sensors=sensor_defs),
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
from pathlib import Path

from dagster import AssetKey

AIRFLOW_BASE_URL = "http://localhost:8080"
AIRFLOW_INSTANCE_NAME = "my_airflow_instance"

Expand All @@ -11,6 +13,7 @@

ASSETS_PATH = Path(__file__).parent / "defs"
MIGRATION_STATE_PATH = Path(__file__).parent / "migration"
DBT_DAG_ASSET_KEY = AssetKey(["airflow_instance", "dag", "dbt_dag"])


def dbt_project_path() -> Path:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import timedelta

from dagster import build_last_update_freshness_checks, build_sensor_for_freshness_checks
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
Expand All @@ -12,6 +15,7 @@
from .constants import (
AIRFLOW_BASE_URL,
AIRFLOW_INSTANCE_NAME,
DBT_DAG_ASSET_KEY,
PASSWORD,
USERNAME,
dbt_manifest_path,
Expand All @@ -24,7 +28,15 @@
name=AIRFLOW_INSTANCE_NAME,
)


# We expect the dbt dag to have completed within an hour of 9:00 AM every day
dbt_freshness_checks = build_last_update_freshness_checks(
assets=[DBT_DAG_ASSET_KEY],
lower_bound_delta=timedelta(hours=1),
deadline_cron="0 9 * * *",
)
freshness_sensor = build_sensor_for_freshness_checks(
freshness_checks=dbt_freshness_checks,
)
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=combine_defs(
Expand All @@ -42,5 +54,7 @@
csv_path=CSV_PATH,
duckdb_path=DB_PATH,
),
*dbt_freshness_checks,
freshness_sensor,
),
)

0 comments on commit 8fa1b6e

Please sign in to comment.