Skip to content

Commit

Permalink
[dagster-airlift] tutorial auth (#24199)
Browse files Browse the repository at this point in the history
Add a section on custom auth to the tutorial. This could be genericized
for sure since there are other potential uses for a subclass of the
proxy, but to keep it use-case driven.
  • Loading branch information
dpeng817 authored Sep 4, 2024
1 parent 7e9c403 commit 61c64eb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,47 @@ In order to migrate a task, you must do two things:
1. First, ensure all associated assets are executable in Dagster by providing asset definitions in place of bare asset specs.
2. The `migrated: False` status in the `migration_state` YAML folder must be adjusted to `migrated: True`.

Any task marked as migrated will use the `DagsterOperator` when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task.
Any task marked as migrated will use the `DefaultProxyToDagsterOperator` when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task.

The migration file acts as the source of truth for migration status. The information is attached to the DAG and then accessed by Dagster via the REST API.

A task which has been migrated can be easily toggled back to run in Airflow (for example, if a bug in implementation was encountered) simply by editing the file to `migrated: False`.

#### Supporting custom authorization

If your dagster deployment lives behind a custom auth backend, you can customize the airflow-to-dagster proxying behavior to authenticate to your backend.
`mark_as_dagster_migrating` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyToDagsterOperator` class. This allows you to
override how a session is created. Let's say for example, your dagster installation requires an access key to be set whenever a request is made, and that access key is set in an airflow `Variable` called `my_api_key`.
We can create a custom `BaseProxyToDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to dagster's graphql API
will be made using that api key.

```python
# tutorial_example/airflow_dags/custom_proxy.py
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyToDagsterOperator
class CustomProxyToDagsterOperator(BaseProxyToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session
def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"
...
# At the end of your dag file
mark_as_dagster_migrating(
global_vars=globals(), migration_state=..., dagster_operator_klass=CustomProxyToDagsterOperator
)
```

#### Migrating common operators

For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks. In fact, the `dbt_defs` factory used earlier already backs its assets with definitions, so we can toggle the migration status of the `build_dbt_models` task to `migrated: True` in the migration state file:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyToDagsterOperator, mark_as_dagster_migrating


class CustomProxyToDagsterOperator(BaseProxyToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session

def get_dagster_url(self, context: Context) -> str:
return "https://dagster.example.com/"


...

# At the end of your dag file
mark_as_dagster_migrating(
global_vars=globals(), migration_state=..., dagster_operator_klass=CustomProxyToDagsterOperator
)

0 comments on commit 61c64eb

Please sign in to comment.