Skip to content

Commit

Permalink
[dagster-airlift] consolidate dags (#23856)
Browse files Browse the repository at this point in the history
consolidate dags into a single file. Makes demo flow a bit easier.
  • Loading branch information
dpeng817 authored Aug 23, 2024
1 parent 90c6304 commit 52abfa9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@

from airflow import DAG
from airflow.models.operator import BaseOperator
from airflow.operators.bash import BashOperator
from dagster_airlift.in_airflow import mark_as_dagster_migrating
from dagster_airlift.migration_state import load_migration_state_from_yaml
from dbt_example.shared.load_iris import load_csv_to_duckdb
from dbt_example.shared.load_iris import iris_path, load_csv_to_duckdb

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 7, 18),
"retries": 0,
}


class LoadToLakehouseOperator(BaseOperator):
Expand All @@ -25,18 +33,19 @@ def execute(self, context) -> None:
)


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 0,
}
DBT_DIR = os.getenv("DBT_PROJECT_DIR")
# Create the DAG with the specified schedule interval
dbt_dag = DAG("dbt_dag", default_args=default_args, schedule_interval=None)
args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}"
run_dbt_model = BashOperator(
task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dbt_dag
)

dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None)
load_iris = LoadToLakehouseOperator(
task_id="load_iris",
dag=dag,
csv_path=Path(__file__).parent / "iris.csv",
csv_path=iris_path(),
db_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
columns=[
"sepal_length_cm",
Expand All @@ -46,6 +55,7 @@ def execute(self, context) -> None:
"species",
],
)

mark_as_dagster_migrating(
global_vars=globals(),
migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"),
Expand Down

This file was deleted.

0 comments on commit 52abfa9

Please sign in to comment.