Skip to content

Commit

Permalink
[dagster-airlift] [tutorial] add tutorial section for dag-level proxy…
Browse files Browse the repository at this point in the history
…ing (#25280)

## Summary & Motivation
Adds a section to the tutorial on proxying at the dag level
## How I Tested These Changes
Should probably add some more tests for the new examples here.
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 15, 2024
1 parent b8ea604 commit 06ea16e
Show file tree
Hide file tree
Showing 10 changed files with 500 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,288 @@ defs = build_defs_from_airflow_instance(
```

</details>

## Migrating an entire DAG at once

There may be DAGs for which you want to migrate the entire thing at once rather than on a per-task basis.
Some reasons for taking this approach:

- You're making use of "dynamic tasks" in Airflow, which don't conform neatly to the task mapping protocol we've laid out above.
- You want to make more substantial refactors to the dag structure that don't conform to the existing task structure

For cases like this, we allow you to map assets to a full DAG.

### Observing DAG-mapped assets

At the observation stage, you'll call `assets_with_dag_mappings` instead of `assets_with_task_mappings`.

For our `rebuild_customers_list` DAG, let's take a look at what the new observation code looks like:

```python
# tutorial_example/dagster_defs/stages/observe_dag_level.py
import os
from pathlib import Path
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Instead of mapping assets to individual tasks, we map them to the entire DAG.
mapped_assets = assets_with_dag_mappings(
dag_mappings={
"rebuild_customers_list": [
AssetSpec(key=["raw_data", "raw_customers"]),
dbt_project_assets,
AssetSpec(key="customers_csv", deps=["customers"]),
],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)
```

Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes.

### Migrating DAG-mapped assets

Recall that in the task-by-task migration step, we "proxy" execution on a task by task basis, which is controlled by a yaml document.
For DAG-mapped assets, execution is proxied on a per-DAG basis.
Proxying execution to Dagster will require all assets mapped to that DAG be _executable_ within Dagster.
Let's take a look at some fully migrated code mapped to DAGs instead of tasks:

```python
# tutorial_example/dagster_defs/stages/migrate_dag_level.py
import os
from pathlib import Path
from dagster import (
AssetExecutionContext,
AssetsDefinition,
AssetSpec,
Definitions,
materialize,
multi_asset,
)
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
def airflow_dags_path() -> Path:
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"
def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition:
@multi_asset(name=f"load_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
load_csv_to_duckdb(args)
return _multi_asset
def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition:
@multi_asset(name=f"export_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
export_duckdb_to_csv(args)
return _multi_asset
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_dag_mappings(
dag_mappings={
"rebuild_customers_list": [
load_csv_to_duckdb_asset(
AssetSpec(key=["raw_data", "raw_customers"]),
LoadCsvToDuckDbArgs(
table_name="raw_customers",
csv_path=airflow_dags_path() / "raw_customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
names=["id", "first_name", "last_name"],
duckdb_schema="raw_data",
duckdb_database_name="jaffle_shop",
),
),
dbt_project_assets,
export_duckdb_to_csv_defs(
AssetSpec(key="customers_csv", deps=["customers"]),
ExportDuckDbToCsvArgs(
table_name="customers",
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
duckdb_database_name="jaffle_shop",
),
),
],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)
```

Now that all of our assets are fully executable, we can create a simple yaml file to proxy execution for the whole dag:

```yaml
# tutorial_example/snippets/rebuild_customers_list.yaml
proxied: True
```

We will similarly use `proxying_to_dagster` at the end of our DAG file (the code is exactly the same here as it was for the per-task migration step)

```python
# tutorial_example/snippets/dags_truncated.py
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path
from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
dag = DAG("rebuild_customers_list", ...)
...
# Set this to True to begin the proxying process
PROXYING = False
if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
```

Once the `proxied` bit is flipped to True, we can go to the Airflow UI, and we'll see that our tasks have been replaced with a single task.

<p align="center">

![Before DAG proxying](./images/before_dag_override.svg)
![After DAG proxying](./images/after_dag_override.svg)

</p>

When performing dag-level mapping, we don't preserve task structure in the Airflow dags. This single task will materialize all mapped Dagster assets instead of executing the original Airflow task business logic.

We can similarly mark `proxied` back to `False`, and the original task structure and business logic will return unchanged.

### Customizing DAG proxying operator

Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the `build_from_dag_fn` argument of `proxying_to_dagster` to provide a custom operator in place of the default.

For example, let's take a look at the following custom operator which expects an API key to be provided as a variable:

```python
# tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session
def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"
# This method controls how the operator is built from the dag.
@classmethod
def build_from_dag(cls, dag: DAG):
return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")
dag = DAG(
dag_id="custom_dag_level_proxy_example",
)
# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,
)
```

`BaseProxyDAGToDagsterOperator` has three abstract methods which must be implemented:

- `get_dagster_session`, which controls the creation of a valid session to access the Dagster graphql API.
- `get_dagster_url`, which retrieves the domain at which the dagster webserver lives.
- `build_from_dag`, which controls how the proxying task is constructed from the provided DAG.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
tasks:
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
Expand Down
Loading

0 comments on commit 06ea16e

Please sign in to comment.