Skip to content

Commit

Permalink
[dagster-airlift][moves] airlift moves (#25832)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This PR was original just moving the tutorial. Now using it as an accumulator for all the "moving shit out of airlift directory" PRs before landing them as a unit.
  • Loading branch information
dpeng817 authored Nov 13, 2024
1 parent 86591fb commit dcdf600
Show file tree
Hide file tree
Showing 244 changed files with 1,503 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
],
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/dbt-example",
"examples/starlift-demo",
name="airlift-demo-live-tests",
env_vars=[
"KS_DBT_CLOUD_ACCOUNT_ID",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def k8s_extra_cmds(version: AvailablePythonVersion, _) -> List[str]:
# Runs against live dbt cloud instance, we only want to run on commits and on the
# nightly build
PackageSpec(
"examples/experimental/dagster-airlift/examples/dbt-example",
"examples/starlift-demo",
skip_if=skip_if_not_airlift_or_dlift_commit,
env_vars=[
"KS_DBT_CLOUD_ACCOUNT_ID",
Expand All @@ -386,15 +386,15 @@ def k8s_extra_cmds(version: AvailablePythonVersion, _) -> List[str]:
queue=BuildkiteQueue.DOCKER,
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/perf-harness",
"examples/experimental/dagster-airlift/perf-harness",
always_run_if=has_dagster_airlift_changes,
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/tutorial-example",
"examples/airlift-migration-tutorial",
always_run_if=has_dagster_airlift_changes,
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/kitchen-sink",
"examples/experimental/dagster-airlift/kitchen-sink",
always_run_if=has_dagster_airlift_changes,
),
PackageSpec(
Expand Down
8 changes: 4 additions & 4 deletions docs/content/integrations/airlift/full_dag.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ When migrating an entire DAG at once, we'll want to create assets which map to t

For our `rebuild_customers_list` DAG, let's take a look at what the new observation code looks like:

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py
```python file=../../airlift-migration-tutorial/tutorial_example/dagster_defs/stages/observe_dag_level.py
import os
from pathlib import Path

Expand Down Expand Up @@ -79,7 +79,7 @@ Now, instead of getting a materialization when a particular task completes, each

Recall that in the [task-by-task migration step](/integrations/airlift/tutorial/migrate), we "proxy" execution on a task by task basis, which is controlled by a yaml document. For DAG-mapped assets, execution is proxied on a per-DAG basis. Proxying execution to Dagster will require all assets mapped to that DAG be _executable_ within Dagster. Let's take a look at some fully migrated code mapped to DAGs instead of tasks:

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py
```python file=../../airlift-migration-tutorial/tutorial_example/dagster_defs/stages/migrate_dag_level.py
import os
from pathlib import Path

Expand Down Expand Up @@ -192,13 +192,13 @@ if __name__ == "__main__":

Now that all of our assets are fully executable, we can create a simple yaml file to proxy execution for the whole dag:

```yaml file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/rebuild_customers_list.yaml
```yaml file=../../airlift-migration-tutorial/tutorial_example/snippets/rebuild_customers_list.yaml
proxied: True
```
We will similarly use `proxying_to_dagster` at the end of our DAG file (the code is exactly the same here as it was for the per-task migration step)

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/dags_truncated.py
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/dags_truncated.py
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path
Expand Down
6 changes: 3 additions & 3 deletions docs/content/integrations/airlift/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ description: "dagster-airlift is a toolkit for observing and migrating Airflow D

If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. `proxying_to_dagster` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyTasktoDagsterOperator` class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow `Variable` called `my_api_key`. We can create a custom `BaseProxyTasktoDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/custom_proxy.py
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/custom_operator_examples/custom_proxy.py
from pathlib import Path

import requests
Expand Down Expand Up @@ -55,7 +55,7 @@ proxying_to_dagster(

You can use a custom proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, follow [this](https://docs.dagster.io/dagster-plus/account/managing-user-agent-tokens#managing-user-tokens) guide.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/plus_proxy_operator.py
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/custom_operator_examples/plus_proxy_operator.py
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator
Expand Down Expand Up @@ -130,7 +130,7 @@ Similar to how we can customize the operator we construct on a per-dag basis, we

For example, in the following example we can see that the operator is customized to provide an authorization header which authenticates Dagster.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py
from pathlib import Path

import requests
Expand Down
Loading

1 comment on commit dcdf600

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-9bibmt254-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit dcdf600.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.