diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 1186bd69ff960..a784329e9fc60 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -1341,10 +1341,6 @@ { "title": "Migrating from Airflow", "path": "/guides/migrations/migrating-airflow-to-dagster" - }, - { - "title": "Observe your Airflow pipelines with Dagster", - "path": "/guides/migrations/observe-your-airflow-pipelines-with-dagster" } ] }, diff --git a/docs/content/guides/migrations/observe-your-airflow-pipelines-with-dagster.mdx b/docs/content/guides/migrations/observe-your-airflow-pipelines-with-dagster.mdx deleted file mode 100644 index ed2f0fe3cbfc6..0000000000000 --- a/docs/content/guides/migrations/observe-your-airflow-pipelines-with-dagster.mdx +++ /dev/null @@ -1,105 +0,0 @@ ---- -title: "Observe your Airflow pipelines with Dagster | Dagster Docs" -description: "Learn how to leverage the features of Dagster and Airflow together." ---- - -# Observe your Airflow pipelines with Dagster - -Dagster can act as a single entry point to all orchestration platforms in use at your organization. By injecting a small amount of code into your existing pipelines, you can report events to Dagster, where you can then visualize the full lineage of pipelines. This can be particularly useful if you have multiple Apache Airflow environments, and hope to build a catalog and observation platform through Dagster. - -## Emitting materialization events from Airflow to Dagster - -Imagine you have a large number of pipelines written in Apache Airflow and wish to introduce Dagster into your stack. By using custom Airflow operators, you can continue to run your existing pipelines while you work toward migrating them off Airflow, or while building new pipelines in Dagster that are tightly integrated with your legacy systems. - -To do this, we will define a `DagsterAssetOperator` operator downstream of your Airflow DAG to indicate that the pipeline's processing has concluded. The HTTP endpoint of the Dagster server, the `asset_key`, and additional metadata and descriptions are to be specified to inform Dagster of the materialization. - -```python -from typing import Dict, Optional - -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -import requests - -class DagsterAssetOperator(BaseOperator): - @apply_defaults - def __init__( - self, - dagster_webserver_host: str, - dagster_webserver_port: str, - asset_key: str, - metadata: Optional[Dict] = None, - description: Optional[str] = None, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self.dagster_webserver_host = dagster_webserver_host - self.dagster_webserver_port = dagster_webserver_port - self.asset_key = asset_key - self.metadata = metadata or {} - self.description = description - - def execute(self, context): - url = f"http://{dagster_webserver_host}:{dagster_webserver_port}/report_asset_materialization/{self.asset_key}" - payload = {"metadata": self.metadata, "description": self.description} - headers = {"Content-Type": "application/json"} - - response = requests.post(url, json=payload, headers=headers) - response.raise_for_status() - - self.log.info( - f"Reported asset materialization to Dagster. Response: {response.text}" - ) -``` - -Then, we can append this to our Airflow DAG to indicate that a pipeline has run successfully. - -```python -import os - -dagster_webserver_host = os.environ.get("DAGSTER_WEBSERVER_HOST", "localhost") -dagster_webserver_port = os.environ.get("DAGSTER_WEBSERVER_PORT", "3000") - -dagster_op = DagsterAssetOperator( - task_id="report_dagster_asset_materialization", - dagster_webserver_host=dagster_webserver_host, - dagster_webserver_port=dagster_webserver_port, - asset_key="example_external_airflow_asset", - metadata={"airflow/tag": "example", "source": "external"}, -) -``` - -Once the events are emitted from Airflow, there are two options for scheduling Dagster materializations following the external Airflow materialization event: asset sensors and auto materialization policies. - -An external asset is created in Dagster, and an `asset_sensor` is used to identify the materialization events that are being sent from Airflow. - -```python -from dagster import external_asset_from_spec - -example_external_airflow_asset = external_asset_from_spec( - AssetSpec("example_external_airflow_asset", - group_name="External") -) -``` - -```python -from dagster import ( - AssetKey, - EventLogEntry, - RunRequest, - SensorEvaluationContext, - asset_sensor -) - -@asset_sensor( - asset_key=AssetKey("example_external_airflow_asset"), - job=example_external_airflow_asset_job -) -def example_external_airflow_asset_sensor( - context: SensorEvaluationContext, asset_event: EventLogEntry -): - assert asset_event.dagster_event and asset_event.dagster_event.asset_key - yield RunRequest(run_key=context.cursor) -``` - -Now, when a materialization event occurs on the external `example_external_airflow_asset` asset, the `example_external_airflow_asset_job` job will be triggered. Here, you can define logic that can build upon the DAG from your Airflow environment.