Skip to content

Commit

Permalink
[dagster-airlift] peer/observe/migrate for dbt example
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Aug 13, 2024
1 parent 4d7e546 commit c244c8f
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"python.analysis.userFileIndexingLimit": 5000,
"python.analysis.indexing": true,
"python.analysis.extraPaths": [
"../../../../../python_modules/dagster",
"../../../../../examples/experimental/dagster-airlift",
"../../../../../examples/experimental/dagster-blueprints",
"../../../../../python_modules/libraries/dagster-dbt"
],
"python.analysis.diagnosticMode": "openFilesOnly",
"python.analysis.useLibraryCodeForTypes": false,
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ setup_local_env:
run_airflow:
airflow standalone

run_dagster_dev:
dagster dev -m dbt_example.dagster_defs -p 3333
run_peer:
dagster dev -m dbt_example.dagster_defs.peer -p 3333

run_observe:
dagster dev -m dbt_example.dagster_defs.observe -p 3333

run_migrate:
dagster dev -m dbt_example.dagster_defs.migrate -p 3333

wipe: ## Wipe out all the files created by the Makefile
rm -rf $$AIRFLOW_HOME $$DAGSTER_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,31 @@ airflow dags into dagster as assets.

### Try it out

Run the `dev_install` make command to install python dependencies.
From the root of the `dbt-example` directory, run the `dev_install` make command to install python dependencies.

```bash
make dev_install
```

Run setup commands, which will scaffold a local airflow, dagster instance, and dbt project.
```bash
make setup_local_env
```

Launch airflow, where we've loaded two dags:

- `load_lakehouse`, which ingests a csv file into a duckdb table called `iris_table`
- `load_lakehouse`, which uses as custom operator `LoadCSVToDuckDB` to ingest a CSV as a duckdb table called `iris_table`.
- `dbt_dag`, which loads a modified jaffle shop project, and has `iris_table` as a dbt source.

```bash
make run_airflow
```

In another shell, run `dagster dev`, where you should see the full dbt-airflow lineage show up.
In another shell, we can run dagster at the `peer`, `observe`, or `migrate` steps of the migration using any of the following commands:

```bash
make run_dagster_dev
make `run_peer`
make `run_observe`
make `run_migrate`
```
Note that in order to run the observation step with `run_observe`, you must set `migrated` to `False` for each task in the dags. These can be found in `./airflow_dags/migration_state/<dag_name>.yaml`.
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .definitions import defs as defs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Airflow instance running at localhost:8080
import os
from pathlib import Path

AIRFLOW_BASE_URL = "http://localhost:8080"
Expand All @@ -10,3 +11,9 @@

ASSETS_PATH = Path(__file__).parent / "defs"
MIGRATION_STATE_PATH = Path(__file__).parent / "migration"


def dbt_project_path() -> Path:
env_val = os.getenv("DBT_PROJECT_DIR")
assert env_val
return Path(env_val)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from pathlib import Path
from typing import List
from typing import List, Optional

from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster_airlift.core import DefsFactory
Expand All @@ -12,24 +12,27 @@
class CSVToDuckdbDefs(DefsFactory):
table_name: str
csv_path: Path
duckdb_path: Path
column_names: List[str]
duckdb_schema: str
duckdb_database_name: str
name: str
duckdb_path: Optional[Path] = None
column_names: Optional[List[str]] = None
duckdb_database_name: Optional[str] = None

def build_defs(self) -> Definitions:
@multi_asset(
specs=[AssetSpec(key=AssetKey([self.duckdb_schema, self.table_name]))], name=self.name
)
asset_spec = AssetSpec(key=AssetKey([self.duckdb_schema, self.table_name]))

@multi_asset(specs=[asset_spec])
def _multi_asset():
load_csv_to_duckdb(
table_name=self.table_name,
csv_path=self.csv_path,
duckdb_path=self.duckdb_path,
names=self.column_names,
duckdb_schema=self.duckdb_schema,
duckdb_database_name=self.duckdb_database_name,
)
if self.duckdb_path is None:
raise Exception("This asset is not yet executable. Need to provide a duckdb_path.")
else:
load_csv_to_duckdb(
table_name=self.table_name,
csv_path=self.csv_path,
duckdb_path=self.duckdb_path,
names=self.column_names,
duckdb_schema=self.duckdb_schema,
duckdb_database_name=self.duckdb_database_name,
)

return Definitions(assets=[_multi_asset])
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dbt_example.dagster_defs.csv_to_duckdb_defs import CSVToDuckdbDefs

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME, dbt_project_path

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
Expand All @@ -18,12 +18,6 @@
)


def dbt_project_path() -> Path:
env_val = os.getenv("DBT_PROJECT_DIR")
assert env_val
return Path(env_val)


defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
orchestrated_defs=defs_from_factories(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
from pathlib import Path

from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance
from dagster_airlift.core.def_factory import defs_from_factories
from dagster_airlift.dbt import DbtProjectDefs

from dbt_example.dagster_defs.csv_to_duckdb_defs import CSVToDuckdbDefs

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME, dbt_project_path

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
)


defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
orchestrated_defs=defs_from_factories(
CSVToDuckdbDefs(
name="load_lakehouse__load_iris",
table_name="iris_lakehouse_table",
csv_path=Path("iris.csv"),
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
column_names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
duckdb_schema="iris_dataset",
duckdb_database_name="jaffle_shop",
),
DbtProjectDefs(
name="dbt_dag__build_dbt_models",
dbt_manifest=dbt_project_path() / "target" / "manifest.json",
),
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
)


defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)

0 comments on commit c244c8f

Please sign in to comment.