Skip to content

Commit

Permalink
Pass on tutorial for code example include-ability and reduced verbosi…
Browse files Browse the repository at this point in the history
…ty (dagster-io#23923)

## Summary & Motivation

Making code examples include all imports and eliminating direct usage of AssetKey for verbosity reduction.

## How I Tested These Changes

Read

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
schrockn authored Aug 26, 2024
1 parent 9d6c2b2 commit bb0ede8
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions examples/experimental/dagster-airlift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Airlift is a toolkit for observing Airflow instances from within Dagster and for

- Observe Airflow DAGs and their execution history with no changes to Airflow code
- Model and observe assets orchestrated by Airflow with no changes to Airflow code
- (Future) Enable a migration process that
- Enable a migration process that
- Can be done task-by-task in any order with minimal coordination
- Has task-by-task rollback to reduce risk
- That retains Airflow DAG structure and execution history during the migration
Expand Down Expand Up @@ -46,12 +46,13 @@ First, you will need to install `dagster-airlift` in your Dagster environment:

```bash
pip install uv
uv pip install dagster-airlift[core]
uv pip install dagster-airlift[core] dagster-webserver
```

Next, you should create a `Definitions` object using `build_defs_from_airflow_instance`.

```python
# airlift.py
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
Expand All @@ -78,6 +79,10 @@ This function creates:

_Note: When the code location loads, Dagster will query the Airflow REST API in order to build a representation of your DAGs. In order for Dagster to reflect changes to your DAGs, you will need to reload your code location._

```bash
dagster dev -f airlift.py
```

<details>
<summary>
*Peering to multiple instances*
Expand Down Expand Up @@ -130,20 +135,22 @@ The first and third tasks involve a single table each. We can manually construct
provided to the `defs` argument to `build_defs_from_airflow_instance`.

```python
from dagster import AssetSpec

from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs

defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=dag_defs(
"rebuild_customers_list",
task_defs(
"load_raw_customers",
AssetSpec(key=AssetKey(["raw_data", "raw_customers"]))
AssetSpec(key=["raw_data", "raw_customers"])
),
# encode dependency on customers output
task_defs(
"export_customers",
AssetSpec(key=AssetKey("customers_csv"), deps={
AssetKey("customers")
})
AssetSpec(key="customers_csv", deps=["customers"])
),
)
)
Expand All @@ -152,20 +159,22 @@ defs = build_defs_from_airflow_instance(
To build assets for our dbt invocation, we can use the Dagster-supplied factory `dbt_defs`, installable via `uv pip install dagster-airlift[dbt]`. This will load each dbt model as its own asset:

```python
from dagster import AssetSpec
from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs
from dagster_dbt import DbtProject

defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=dag_defs(
"rebuild_customers_list",
task_defs(
"load_raw_customers",
AssetSpec(key=AssetKey(["raw_data", "raw_customers"]))
AssetSpec(key=["raw_data", "raw_customers"])
),
# encode dependency on customers output
task_defs(
"export_customers",
AssetSpec(key=AssetKey("customers_csv"), deps={
AssetKey("customers")
})
AssetSpec(key="customers_csv", deps=["customers"])
),
task_defs(
"build_dbt_models",
Expand Down Expand Up @@ -245,6 +254,9 @@ The migration file acts as the source of truth for migration status. A task whic
For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks:

```python
from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs
from dagster_dbt import DbtProject

defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=dag_defs(
Expand All @@ -265,14 +277,15 @@ defs = build_defs_from_airflow_instance(

For all other operator types, we recommend creating a new factory class whose arguments match the inputs to your Airflow operator. Then, you can use this factory to build definitions for each Airflow task.

For example, our `load_raw_customers` task uses a custom `LoadCSVToDuckDB` operator. We'll define a `CSVToDuckDBDefs` factory to build corresponding software-defined assets:
For example, our `load_raw_customers` task uses a custom `LoadCSVToDuckDB` operator. We'll define a function `load_csv_to_duckdb_defs` factory to build corresponding software-defined assets:

```python
from dataclasses import dataclass
from pathlib import Path
from typing import List

from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster import AssetSpec, Definitions, multi_asset
from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs

from tutorial_example.shared.load_csv_to_duckdb import load_csv_to_duckdb

Expand All @@ -284,7 +297,7 @@ def load_csv_to_duckdb_defs(
duckdb_schema: str,
duckdb_database_name: str,
) -> Definitions:
@multi_asset(specs=[AssetSpec(key=AssetKey([duckdb_schema, table_name]))])
@multi_asset(specs=[AssetSpec(key=[duckdb_schema, table_name])])
def _multi_asset() -> None:
load_csv_to_duckdb(
table_name=table_name,
Expand All @@ -297,11 +310,6 @@ def load_csv_to_duckdb_defs(

return Definitions(assets=[_multi_asset])

```

We can then use our new factory to supply definitions:

```python
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=dag_defs(
Expand Down

0 comments on commit bb0ede8

Please sign in to comment.