From c244c8f38a8eae9a483828a9502e214d76e5b201 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Tue, 13 Aug 2024 09:33:51 -0700 Subject: [PATCH] [dagster-airlift] peer/observe/migrate for dbt example --- .../dbt-example/.vscode/settings.json | 12 ++++++ .../examples/dbt-example/Makefile | 10 ++++- .../examples/dbt-example/README.md | 16 +++++-- .../dbt_example/dagster_defs/__init__.py | 1 - .../dbt_example/dagster_defs/constants.py | 7 +++ .../dagster_defs/csv_to_duckdb_defs.py | 33 +++++++------- .../{definitions.py => migrate.py} | 8 +--- .../dbt_example/dagster_defs/observe.py | 43 +++++++++++++++++++ .../dbt_example/dagster_defs/peer.py | 13 ++++++ 9 files changed, 114 insertions(+), 29 deletions(-) create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/.vscode/settings.json rename examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/{definitions.py => migrate.py} (91%) create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/peer.py diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/.vscode/settings.json b/examples/experimental/dagster-airlift/examples/dbt-example/.vscode/settings.json new file mode 100644 index 0000000000000..2121b578c87a3 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/.vscode/settings.json @@ -0,0 +1,12 @@ +{ + "python.analysis.userFileIndexingLimit": 5000, + "python.analysis.indexing": true, + "python.analysis.extraPaths": [ + "../../../../../python_modules/dagster", + "../../../../../examples/experimental/dagster-airlift", + "../../../../../examples/experimental/dagster-blueprints", + "../../../../../python_modules/libraries/dagster-dbt" + ], + "python.analysis.diagnosticMode": "openFilesOnly", + "python.analysis.useLibraryCodeForTypes": false, +} \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile index dffde73737f07..ad0707b7c57cc 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile +++ b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile @@ -35,8 +35,14 @@ setup_local_env: run_airflow: airflow standalone -run_dagster_dev: - dagster dev -m dbt_example.dagster_defs -p 3333 +run_peer: + dagster dev -m dbt_example.dagster_defs.peer -p 3333 + +run_observe: + dagster dev -m dbt_example.dagster_defs.observe -p 3333 + +run_migrate: + dagster dev -m dbt_example.dagster_defs.migrate -p 3333 wipe: ## Wipe out all the files created by the Makefile rm -rf $$AIRFLOW_HOME $$DAGSTER_HOME diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/README.md b/examples/experimental/dagster-airlift/examples/dbt-example/README.md index 6a5d787f8ca5c..dcd00fd75aecd 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/README.md +++ b/examples/experimental/dagster-airlift/examples/dbt-example/README.md @@ -5,23 +5,31 @@ airflow dags into dagster as assets. ### Try it out -Run the `dev_install` make command to install python dependencies. +From the root of the `dbt-example` directory, run the `dev_install` make command to install python dependencies. ```bash make dev_install ``` +Run setup commands, which will scaffold a local airflow, dagster instance, and dbt project. +```bash +make setup_local_env +``` + Launch airflow, where we've loaded two dags: -- `load_lakehouse`, which ingests a csv file into a duckdb table called `iris_table` +- `load_lakehouse`, which uses as custom operator `LoadCSVToDuckDB` to ingest a CSV as a duckdb table called `iris_table`. - `dbt_dag`, which loads a modified jaffle shop project, and has `iris_table` as a dbt source. ```bash make run_airflow ``` -In another shell, run `dagster dev`, where you should see the full dbt-airflow lineage show up. +In another shell, we can run dagster at the `peer`, `observe`, or `migrate` steps of the migration using any of the following commands: ```bash -make run_dagster_dev +make `run_peer` +make `run_observe` +make `run_migrate` ``` +Note that in order to run the observation step with `run_observe`, you must set `migrated` to `False` for each task in the dags. These can be found in `./airflow_dags/migration_state/.yaml`. diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/__init__.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/__init__.py index 3c25b881e04ff..e69de29bb2d1d 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/__init__.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/__init__.py @@ -1 +0,0 @@ -from .definitions import defs as defs diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py index edfc0a06ca6df..d2d820376147c 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py @@ -1,4 +1,5 @@ # Airflow instance running at localhost:8080 +import os from pathlib import Path AIRFLOW_BASE_URL = "http://localhost:8080" @@ -10,3 +11,9 @@ ASSETS_PATH = Path(__file__).parent / "defs" MIGRATION_STATE_PATH = Path(__file__).parent / "migration" + + +def dbt_project_path() -> Path: + env_val = os.getenv("DBT_PROJECT_DIR") + assert env_val + return Path(env_val) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/csv_to_duckdb_defs.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/csv_to_duckdb_defs.py index 40867c2785d07..798ca23de1133 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/csv_to_duckdb_defs.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/csv_to_duckdb_defs.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from pathlib import Path -from typing import List +from typing import List, Optional from dagster import AssetKey, AssetSpec, Definitions, multi_asset from dagster_airlift.core import DefsFactory @@ -12,24 +12,27 @@ class CSVToDuckdbDefs(DefsFactory): table_name: str csv_path: Path - duckdb_path: Path - column_names: List[str] duckdb_schema: str - duckdb_database_name: str name: str + duckdb_path: Optional[Path] = None + column_names: Optional[List[str]] = None + duckdb_database_name: Optional[str] = None def build_defs(self) -> Definitions: - @multi_asset( - specs=[AssetSpec(key=AssetKey([self.duckdb_schema, self.table_name]))], name=self.name - ) + asset_spec = AssetSpec(key=AssetKey([self.duckdb_schema, self.table_name])) + + @multi_asset(specs=[asset_spec]) def _multi_asset(): - load_csv_to_duckdb( - table_name=self.table_name, - csv_path=self.csv_path, - duckdb_path=self.duckdb_path, - names=self.column_names, - duckdb_schema=self.duckdb_schema, - duckdb_database_name=self.duckdb_database_name, - ) + if self.duckdb_path is None: + raise Exception("This asset is not yet executable. Need to provide a duckdb_path.") + else: + load_csv_to_duckdb( + table_name=self.table_name, + csv_path=self.csv_path, + duckdb_path=self.duckdb_path, + names=self.column_names, + duckdb_schema=self.duckdb_schema, + duckdb_database_name=self.duckdb_database_name, + ) return Definitions(assets=[_multi_asset]) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py similarity index 91% rename from examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py rename to examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py index 9ca590658322a..00ac681340f0c 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/definitions.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py @@ -8,7 +8,7 @@ from dbt_example.dagster_defs.csv_to_duckdb_defs import CSVToDuckdbDefs -from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME +from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME, dbt_project_path airflow_instance = AirflowInstance( auth_backend=BasicAuthBackend( @@ -18,12 +18,6 @@ ) -def dbt_project_path() -> Path: - env_val = os.getenv("DBT_PROJECT_DIR") - assert env_val - return Path(env_val) - - defs = build_defs_from_airflow_instance( airflow_instance=airflow_instance, orchestrated_defs=defs_from_factories( diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py new file mode 100644 index 0000000000000..f3ceb5d9dbb0c --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py @@ -0,0 +1,43 @@ +import os +from pathlib import Path + +from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance +from dagster_airlift.core.def_factory import defs_from_factories +from dagster_airlift.dbt import DbtProjectDefs + +from dbt_example.dagster_defs.csv_to_duckdb_defs import CSVToDuckdbDefs + +from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME, dbt_project_path + +airflow_instance = AirflowInstance( + auth_backend=BasicAuthBackend( + webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD + ), + name=AIRFLOW_INSTANCE_NAME, +) + + +defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, + orchestrated_defs=defs_from_factories( + CSVToDuckdbDefs( + name="load_lakehouse__load_iris", + table_name="iris_lakehouse_table", + csv_path=Path("iris.csv"), + duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", + column_names=[ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + "species", + ], + duckdb_schema="iris_dataset", + duckdb_database_name="jaffle_shop", + ), + DbtProjectDefs( + name="dbt_dag__build_dbt_models", + dbt_manifest=dbt_project_path() / "target" / "manifest.json", + ), + ), +) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/peer.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/peer.py new file mode 100644 index 0000000000000..7fe1153e80620 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/peer.py @@ -0,0 +1,13 @@ +from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance + +from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME + +airflow_instance = AirflowInstance( + auth_backend=BasicAuthBackend( + webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD + ), + name=AIRFLOW_INSTANCE_NAME, +) + + +defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)