Skip to content

Commit

Permalink
[dagster-airlift] change dbt example to include migration fxnality
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Aug 12, 2024
1 parent a9bd145 commit 1bc92e1
Show file tree
Hide file tree
Showing 28 changed files with 56 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ endef
MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
export DBT_PROJECT_DIR := $(MAKEFILE_DIR)/dbt_example/dbt
export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/dbt
export DBT_PROJECT_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt
export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt
export DAGSTER_URL := http://localhost:3333

help:
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Define the default arguments for the DAG
import os
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.bash import BashOperator
from dagster_airlift.in_airflow import mark_as_dagster_migrating
from dagster_airlift.migration_state import load_migration_state_from_yaml

default_args = {
"owner": "airflow",
Expand All @@ -17,3 +20,8 @@
dag = DAG("dbt_dag", default_args=default_args, schedule_interval=timedelta(days=1))
args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}"
run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dag)

mark_as_dagster_migrating(
global_vars=globals(),
migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"),
)
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
import os
from datetime import datetime
from pathlib import Path

import duckdb
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator


def load_csv_to_duckdb():
# Absolute path to the iris dataset is path to current file's directory
csv_path = os.path.join(os.path.dirname(__file__), "iris.csv")
# Duckdb database stored in airflow home
duckdb_path = os.path.join(os.environ["AIRFLOW_HOME"], "jaffle_shop.duckdb")
iris_df = pd.read_csv( # noqa: F841 # used by duckdb
csv_path,
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

# Connect to DuckDB and create a new table
con = duckdb.connect(duckdb_path)
con.execute("CREATE SCHEMA IF NOT EXISTS iris_dataset").fetchall()
con.execute(
"CREATE TABLE IF NOT EXISTS jaffle_shop.iris_dataset.iris_lakehouse_table AS SELECT * FROM iris_df"
).fetchall()
con.close()

from dagster_airlift.in_airflow import mark_as_dagster_migrating
from dagster_airlift.migration_state import load_migration_state_from_yaml
from dbt_example.shared.load_iris import load_csv_to_duckdb

default_args = {
"owner": "airflow",
Expand All @@ -41,3 +16,7 @@ def load_csv_to_duckdb():

dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None)
load_iris = PythonOperator(task_id="load_iris", python_callable=load_csv_to_duckdb, dag=dag)
mark_as_dagster_migrating(
global_vars=globals(),
migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"),
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
tasks:
build_dbt_models:
migrated: True
migrated: True
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
tasks:
load_iris:
migrated: False
migrated: True
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@
BasicAuthBackend,
PythonDefs,
build_defs_from_airflow_instance,
load_migration_state_from_yaml,
)
from dagster_airlift.core.def_factory import defs_from_factories
from dagster_airlift.dbt import DbtProjectDefs

from .constants import (
AIRFLOW_BASE_URL,
AIRFLOW_INSTANCE_NAME,
MIGRATION_STATE_PATH,
PASSWORD,
USERNAME,
)
from dbt_example.shared.load_iris import load_csv_to_duckdb

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
Expand All @@ -41,15 +36,12 @@ def dbt_project_path() -> Path:
PythonDefs(
name="load_lakehouse__load_iris",
specs=[AssetSpec(key=AssetKey.from_user_string("iris_dataset/iris_lakehouse_table"))],
python_fn=lambda: None,
python_fn=load_csv_to_duckdb,
),
DbtProjectDefs(
name="dbt_dag__build_dbt_models",
dbt_project_path=dbt_project_path(),
group="dbt",
),
),
migration_state_override=load_migration_state_from_yaml(
migration_yaml_path=MIGRATION_STATE_PATH
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import duckdb
import pandas as pd


def load_csv_to_duckdb() -> None:
# Absolute path to the iris dataset is path to current file's directory
csv_path = os.path.join(os.path.dirname(__file__), "iris.csv")
# Duckdb database stored in airflow home
duckdb_path = os.path.join(os.environ["AIRFLOW_HOME"], "jaffle_shop.duckdb")
iris_df = pd.read_csv( # noqa: F841 # used by duckdb
csv_path,
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

# Connect to DuckDB and create a new table
con = duckdb.connect(duckdb_path)
con.execute("CREATE SCHEMA IF NOT EXISTS iris_dataset").fetchall()
con.execute(
"CREATE TABLE IF NOT EXISTS jaffle_shop.iris_dataset.iris_lakehouse_table AS SELECT * FROM iris_df"
).fetchall()
con.close()
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def local_env_fixture() -> Generator[None, None, None]:
with environ(
{
"AIRFLOW_HOME": str(makefile_dir / ".airflow_home"),
"DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "dbt"),
"DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "shared" / "dbt"),
"DAGSTER_HOME": str(makefile_dir / ".dagster_home"),
}
):
yield
subprocess.run(["make", "wipe"], check=True)
subprocess.run(["make", "wipe"], cwd=makefile_dir, check=True)


@pytest.fixture(name="dags_dir")
Expand Down

0 comments on commit 1bc92e1

Please sign in to comment.