diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py similarity index 72% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py index 74e89d49ab21e..da64a248f74f6 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py @@ -5,9 +5,17 @@ from airflow import DAG from airflow.models.operator import BaseOperator +from airflow.operators.bash import BashOperator from dagster_airlift.in_airflow import mark_as_dagster_migrating from dagster_airlift.migration_state import load_migration_state_from_yaml -from dbt_example.shared.load_iris import load_csv_to_duckdb +from dbt_example.shared.load_iris import iris_path, load_csv_to_duckdb + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 7, 18), + "retries": 0, +} class LoadToLakehouseOperator(BaseOperator): @@ -25,18 +33,19 @@ def execute(self, context) -> None: ) -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 0, -} +DBT_DIR = os.getenv("DBT_PROJECT_DIR") +# Create the DAG with the specified schedule interval +dbt_dag = DAG("dbt_dag", default_args=default_args, schedule_interval=None) +args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}" +run_dbt_model = BashOperator( + task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dbt_dag +) dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None) load_iris = LoadToLakehouseOperator( task_id="load_iris", dag=dag, - csv_path=Path(__file__).parent / "iris.csv", + csv_path=iris_path(), db_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", columns=[ "sepal_length_cm", @@ -46,6 +55,7 @@ def execute(self, context) -> None: "species", ], ) + mark_as_dagster_migrating( global_vars=globals(), migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"), diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py deleted file mode 100644 index 4906cec4877e5..0000000000000 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py +++ /dev/null @@ -1,27 +0,0 @@ -# Define the default arguments for the DAG -import os -from datetime import datetime, timedelta -from pathlib import Path - -from airflow import DAG -from airflow.operators.bash import BashOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import load_migration_state_from_yaml - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2024, 7, 18), - "retries": 0, - "retry_delay": timedelta(minutes=5), -} -DBT_DIR = os.getenv("DBT_PROJECT_DIR") -# Create the DAG with the specified schedule interval -dag = DAG("dbt_dag", default_args=default_args, schedule_interval=None) -args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}" -run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dag) - -mark_as_dagster_migrating( - global_vars=globals(), - migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"), -)