diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile index fbef1408dbee7..dffde73737f07 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile +++ b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile @@ -7,8 +7,9 @@ endef MAKEFILE_DIR := $(GET_MAKEFILE_DIR) export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home -export DBT_PROJECT_DIR := $(MAKEFILE_DIR)/dbt_example/dbt -export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/dbt +export DBT_PROJECT_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt +export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt +export DAGSTER_URL := http://localhost:3333 help: @egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py index 3e7a7960be338..80949820c180e 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dbt_dag.py @@ -1,9 +1,12 @@ # Define the default arguments for the DAG import os from datetime import datetime, timedelta +from pathlib import Path from airflow import DAG from airflow.operators.bash import BashOperator +from dagster_airlift.in_airflow import mark_as_dagster_migrating +from dagster_airlift.migration_state import load_migration_state_from_yaml default_args = { "owner": "airflow", @@ -17,3 +20,8 @@ dag = DAG("dbt_dag", default_args=default_args, schedule_interval=timedelta(days=1)) args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}" run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dag) + +mark_as_dagster_migrating( + global_vars=globals(), + migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"), +) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py index 2e3739204f9c0..d3de99fb3447a 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/lakehouse.py @@ -1,36 +1,11 @@ -import os from datetime import datetime +from pathlib import Path -import duckdb -import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator - - -def load_csv_to_duckdb(): - # Absolute path to the iris dataset is path to current file's directory - csv_path = os.path.join(os.path.dirname(__file__), "iris.csv") - # Duckdb database stored in airflow home - duckdb_path = os.path.join(os.environ["AIRFLOW_HOME"], "jaffle_shop.duckdb") - iris_df = pd.read_csv( # noqa: F841 # used by duckdb - csv_path, - names=[ - "sepal_length_cm", - "sepal_width_cm", - "petal_length_cm", - "petal_width_cm", - "species", - ], - ) - - # Connect to DuckDB and create a new table - con = duckdb.connect(duckdb_path) - con.execute("CREATE SCHEMA IF NOT EXISTS iris_dataset").fetchall() - con.execute( - "CREATE TABLE IF NOT EXISTS jaffle_shop.iris_dataset.iris_lakehouse_table AS SELECT * FROM iris_df" - ).fetchall() - con.close() - +from dagster_airlift.in_airflow import mark_as_dagster_migrating +from dagster_airlift.migration_state import load_migration_state_from_yaml +from dbt_example.shared.load_iris import load_csv_to_duckdb default_args = { "owner": "airflow", @@ -41,3 +16,7 @@ def load_csv_to_duckdb(): dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None) load_iris = PythonOperator(task_id="load_iris", python_callable=load_csv_to_duckdb, dag=dag) +mark_as_dagster_migrating( + global_vars=globals(), + migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"), +) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/dbt_dag.yaml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/dbt_dag.yaml similarity index 58% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/dbt_dag.yaml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/dbt_dag.yaml index aa00e2f637586..f3ccf3d1382b8 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/dbt_dag.yaml +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/dbt_dag.yaml @@ -1,3 +1,3 @@ tasks: build_dbt_models: - migrated: True \ No newline at end of file + migrated: True \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/load_lakehouse.yaml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/load_lakehouse.yaml similarity index 51% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/load_lakehouse.yaml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/load_lakehouse.yaml index e23540713521b..dcc326190677c 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migration/load_lakehouse.yaml +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/migration_state/load_lakehouse.yaml @@ -1,3 +1,3 @@ tasks: load_iris: - migrated: False \ No newline at end of file + migrated: True \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py index fc7bc9ded688b..4b7b5d32df823 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py @@ -8,18 +8,13 @@ BasicAuthBackend, PythonDefs, build_defs_from_airflow_instance, - load_migration_state_from_yaml, ) from dagster_airlift.core.def_factory import defs_from_factories from dagster_airlift.dbt import DbtProjectDefs -from .constants import ( - AIRFLOW_BASE_URL, - AIRFLOW_INSTANCE_NAME, - MIGRATION_STATE_PATH, - PASSWORD, - USERNAME, -) +from dbt_example.shared.load_iris import load_csv_to_duckdb + +from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME airflow_instance = AirflowInstance( auth_backend=BasicAuthBackend( @@ -41,7 +36,7 @@ def dbt_project_path() -> Path: PythonDefs( name="load_lakehouse__load_iris", specs=[AssetSpec(key=AssetKey.from_user_string("iris_dataset/iris_lakehouse_table"))], - python_fn=lambda: None, + python_fn=load_csv_to_duckdb, ), DbtProjectDefs( name="dbt_dag__build_dbt_models", @@ -49,7 +44,4 @@ def dbt_project_path() -> Path: group="dbt", ), ), - migration_state_override=load_migration_state_from_yaml( - migration_yaml_path=MIGRATION_STATE_PATH - ), ) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/.gitkeep b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/__init__.py similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/.gitkeep rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/__init__.py diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/dbt_project.yml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/dbt_project.yml similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/dbt_project.yml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/dbt_project.yml diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/customers.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/customers.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/customers.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/customers.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/docs.md b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/docs.md similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/docs.md rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/docs.md diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/iris_setosa.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/iris_setosa.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/iris_setosa.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/iris_setosa.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/orders.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/orders.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/orders.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/orders.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/overview.md b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/overview.md similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/overview.md rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/overview.md diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/schema.yml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/schema.yml similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/schema.yml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/schema.yml diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/sources.yml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/sources.yml similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/sources.yml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/sources.yml diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/schema.yml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/schema.yml similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/schema.yml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/schema.yml diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_customers.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_customers.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_customers.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_customers.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_orders.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_orders.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_orders.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_orders.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_payments.sql b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_payments.sql similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/models/staging/stg_payments.sql rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/models/staging/stg_payments.sql diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/profiles.yml b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/profiles.yml similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/profiles.yml rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/profiles.yml diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/requirements.in b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/requirements.in similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/requirements.in rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/requirements.in diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/requirements.txt b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/requirements.txt similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/requirements.txt rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/requirements.txt diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/.gitkeep b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/.gitkeep new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_customers.csv b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_customers.csv similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_customers.csv rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_customers.csv diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_orders.csv b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_orders.csv similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_orders.csv rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_orders.csv diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_payments.csv b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_payments.csv similarity index 100% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dbt/seeds/raw_payments.csv rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/dbt/seeds/raw_payments.csv diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/load_iris.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/load_iris.py new file mode 100644 index 0000000000000..a6d48bcf203be --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/shared/load_iris.py @@ -0,0 +1,29 @@ +import os + +import duckdb +import pandas as pd + + +def load_csv_to_duckdb() -> None: + # Absolute path to the iris dataset is path to current file's directory + csv_path = os.path.join(os.path.dirname(__file__), "iris.csv") + # Duckdb database stored in airflow home + duckdb_path = os.path.join(os.environ["AIRFLOW_HOME"], "jaffle_shop.duckdb") + iris_df = pd.read_csv( # noqa: F841 # used by duckdb + csv_path, + names=[ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + "species", + ], + ) + + # Connect to DuckDB and create a new table + con = duckdb.connect(duckdb_path) + con.execute("CREATE SCHEMA IF NOT EXISTS iris_dataset").fetchall() + con.execute( + "CREATE TABLE IF NOT EXISTS jaffle_shop.iris_dataset.iris_lakehouse_table AS SELECT * FROM iris_df" + ).fetchall() + con.close() diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/conftest.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/conftest.py index 9dbfb1c65b83d..0c2c130d3361b 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/conftest.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/conftest.py @@ -14,12 +14,12 @@ def local_env_fixture() -> Generator[None, None, None]: with environ( { "AIRFLOW_HOME": str(makefile_dir / ".airflow_home"), - "DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "dbt"), + "DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "shared" / "dbt"), "DAGSTER_HOME": str(makefile_dir / ".dagster_home"), } ): yield - subprocess.run(["make", "wipe"], check=True) + subprocess.run(["make", "wipe"], cwd=makefile_dir, check=True) @pytest.fixture(name="dags_dir")