diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/README.md b/examples/experimental/dagster-airlift/examples/tutorial-example/README.md index 5ee62f51fef33..aafa893bcc17e 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/README.md +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/README.md @@ -330,12 +330,47 @@ In order to migrate a task, you must do two things: 1. First, ensure all associated assets are executable in Dagster by providing asset definitions in place of bare asset specs. 2. The `migrated: False` status in the `migration_state` YAML folder must be adjusted to `migrated: True`. -Any task marked as migrated will use the `DagsterOperator` when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task. +Any task marked as migrated will use the `DefaultProxyToDagsterOperator` when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task. The migration file acts as the source of truth for migration status. The information is attached to the DAG and then accessed by Dagster via the REST API. A task which has been migrated can be easily toggled back to run in Airflow (for example, if a bug in implementation was encountered) simply by editing the file to `migrated: False`. +#### Supporting custom authorization + +If your dagster deployment lives behind a custom auth backend, you can customize the airflow-to-dagster proxying behavior to authenticate to your backend. +`mark_as_dagster_migrating` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyToDagsterOperator` class. This allows you to +override how a session is created. Let's say for example, your dagster installation requires an access key to be set whenever a request is made, and that access key is set in an airflow `Variable` called `my_api_key`. +We can create a custom `BaseProxyToDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to dagster's graphql API +will be made using that api key. + +```python +# tutorial_example/airflow_dags/custom_proxy.py +import requests +from airflow.utils.context import Context +from dagster_airlift.in_airflow import BaseProxyToDagsterOperator + + +class CustomProxyToDagsterOperator(BaseProxyToDagsterOperator): + def get_dagster_session(self, context: Context) -> requests.Session: + if "var" not in context: + raise ValueError("No variables found in context") + api_key = context["var"]["value"].get("my_api_key") + session = requests.Session() + session.headers.update({"Authorization": f"Bearer {api_key}"}) + return session + + def get_dagster_url(self, context: Context) -> str: + return "https://dagster.example.com/" + +... + +# At the end of your dag file +mark_as_dagster_migrating( + global_vars=globals(), migration_state=..., dagster_operator_klass=CustomProxyToDagsterOperator +) +``` + #### Migrating common operators For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks. In fact, the `dbt_defs` factory used earlier already backs its assets with definitions, so we can toggle the migration status of the `build_dbt_models` task to `migrated: True` in the migration state file: diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/custom_proxy.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/custom_proxy.py new file mode 100644 index 0000000000000..6fb632d208623 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/custom_proxy.py @@ -0,0 +1,24 @@ +import requests +from airflow.utils.context import Context +from dagster_airlift.in_airflow import BaseProxyToDagsterOperator, mark_as_dagster_migrating + + +class CustomProxyToDagsterOperator(BaseProxyToDagsterOperator): + def get_dagster_session(self, context: Context) -> requests.Session: + if "var" not in context: + raise ValueError("No variables found in context") + api_key = context["var"]["value"].get("my_api_key") + session = requests.Session() + session.headers.update({"Authorization": f"Bearer {api_key}"}) + return session + + def get_dagster_url(self, context: Context) -> str: + return "https://dagster.example.com/" + + +... + +# At the end of your dag file +mark_as_dagster_migrating( + global_vars=globals(), migration_state=..., dagster_operator_klass=CustomProxyToDagsterOperator +)