Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] remove sections in reference #25814

Merged
merged 13 commits into from
Nov 13, 2024
Merged
2 changes: 1 addition & 1 deletion docs/content/integrations/airlift/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: "dagster-airlift integration reference"
description: "dagster-airlift is a toolkit for observing and migrating Airflow DAGs within Dagster."
---

# dagster-airlift integration reference
# `dagster-airlift` integration reference

`dagster-airlift` is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with `dagster-airlift` that is not provided within the tutorial. You should start by reading the [dagster-airlift tutorial](/integrations/airlift/tutorial) before using this reference page.

Expand Down
79 changes: 0 additions & 79 deletions docs/content/integrations/airlift/tutorial.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -425,73 +425,6 @@ The proxied file acts as the source of truth for proxied state. The information

A task which has been proxied can be easily toggled back to run in Airflow (for example, if a bug in implementation was encountered) simply by editing the file to `proxied: False`.

#### Supporting custom authorization

If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. `proxying_to_dagster` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyTasktoDagsterOperator` class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow `Variable` called `my_api_key`. We can create a custom `BaseProxyTasktoDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/custom_proxy.py
from pathlib import Path

import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session

def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"


dag = DAG(
dag_id="custom_proxy_example",
)

# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,
)
```

#### Dagster Plus Authorization

You can use a customer proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, follow [this](https://docs.dagster.io/dagster-plus/account/managing-user-agent-tokens#managing-user-tokens) guide.

```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/custom_operator_examples/plus_proxy_operator.py
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator


class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator):
def get_variable(self, context: Context, var_name: str) -> str:
if "var" not in context:
raise ValueError("No variables found in context")
return context["var"]["value"][var_name]

def get_dagster_session(self, context: Context) -> requests.Session:
dagster_cloud_user_token = self.get_variable(context, "dagster_cloud_user_token")
session = requests.Session()
session.headers.update({"Dagster-Cloud-Api-Token": dagster_cloud_user_token})
return session

def get_dagster_url(self, context: Context) -> str:
org_name = self.get_variable(context, "dagster_plus_organization_name")
deployment_name = self.get_variable(context, "dagster_plus_deployment_name")
return f"https://{org_name}.dagster.plus/{deployment_name}"
```

#### Migrating common operators

For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks. In fact, the `@dbt_assets` decorator used earlier already backs its assets with definitions, so we can toggle the proxied state of the `build_dbt_models` task to `proxied: True` in the proxied state file:
Expand Down Expand Up @@ -1284,15 +1217,3 @@ proxying_to_dagster(
- `get_dagster_session`, which controls the creation of a valid session to access the Dagster graphql API.
- `get_dagster_url`, which retrieves the domain at which the dagster webserver lives.
- `build_from_dag`, which controls how the proxying task is constructed from the provided DAG.

## Addendum: Dealing with changing Airflow

In order to make spin-up more efficient, `dagster-airlift` caches the state of the Airflow instance in the dagster database, so that repeat fetches of the code location don't require additional calls to Airflow's rest API. However, this means that the Dagster definitions can potentially fall out of sync with Airflow. Here are a few different ways this can manifest:

- A new Airflow dag is added. The lineage information does not show up for this dag, and materializations are not recorded.
- A dag is removed. The polling sensor begins failing, because there exist assets which expect that dag to exist.
- The task dependency structure within a dag changes. This may result in `unsynced` statuses in Dagster, or missing materializations. This is not an exhaustive list of problems, but most of the time the tell is that materializations are missing, or assets are missing. When you find yourself in this state, you can force `dagster-airlift` to reload Airflow state by reloading the code location. To do this, go to the `Deployment` tab on the top nav, and click `Redeploy` on the code location relevant to your asset. After some time, the code location should be reloaded with refreshed state from Airflow.

### Automating changes to code locations

If changes to your Airflow instance are controlled via a ci/cd process, you can add a step to automatically induce a redeploy of the relevant code location. See the docs [here](https://docs.dagster.io/concepts/webserver/graphql-client#reloading-all-repositories-in-a-repository-location) on using the graphql client to do this.