From c434a567276638a63f2fd2cb7e00da4d1e6d1003 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Sun, 11 Aug 2024 12:06:39 -0700 Subject: [PATCH] [dagster-airlift] dagster operator (#23386) Dagster operator which is able to remotely invoke dagster via airflow. It searches for a "node def" with the name that matches the task (and matches our opinionated file format for blueprints). Includes unit test which runs against live airflow and dagster, checks that runs of the respective assets are invoked for each task. Tests are probably not entirely sufficient. I'd ideally like to run against a few other cases: - workspace with multiple defined code locations - multi asset with multiple keys. Ensure they all get picked up within same run. The api surface area here won't actually be exposed to the user. What I'm imagining is something like this: ```python ... # dag code create_migrating_dag( migrating_dict={...}, dagster_instance=DagsterInstance(url=...) # or, in cloud case DagsterCloudInstance(url=..., auth_token=...) ) ``` then, under the hood, we swap out operators via the stack, same as we do with the dag construction. any real use case will need new graphql endpoints so that we can easily retrieve asset info per node. The number of steps here feels gratuitous (although only because the way we're retrieving information is a bit hacky) --- .../in_airflow/dagster_operator.py | 81 ++++++++++++++ .../dagster_airlift/in_airflow/gql_queries.py | 104 ++++++++++++++++++ .../dagster_airlift/test/shared_fixtures.py | 2 +- .../integration_tests/conftest.py | 75 +++++++++++-- .../dags/migrated_dag.py | 29 +++++ .../operator_test_project/dagster_defs.py | 19 ++++ .../test_dagster_operator.py | 61 ++++++++++ .../experimental/dagster-airlift/setup.py | 2 +- examples/experimental/dagster-airlift/tox.ini | 3 + 9 files changed, 364 insertions(+), 12 deletions(-) create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py new file mode 100644 index 0000000000000..5f26b2acf214f --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py @@ -0,0 +1,81 @@ +import os + +import requests +from airflow import DAG +from airflow.operators.python import PythonOperator + +from .gql_queries import ASSET_NODES_QUERY, RUNS_QUERY, TRIGGER_ASSETS_MUTATION + + +def compute_fn() -> None: + # https://github.com/apache/airflow/discussions/24463 + os.environ["NO_PROXY"] = "*" + dag_id = os.environ["AIRFLOW_CTX_DAG_ID"] + task_id = os.environ["AIRFLOW_CTX_TASK_ID"] + expected_op_name = f"{dag_id}__{task_id}" + assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys + # create graphql client + dagster_url = os.environ["DAGSTER_URL"] + response = requests.post(f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3) + for asset_node in response.json()["data"]["assetNodes"]: + if asset_node["opName"] == expected_op_name: + repo_location = asset_node["jobs"][0]["repository"]["location"]["name"] + repo_name = asset_node["jobs"][0]["repository"]["name"] + job_name = asset_node["jobs"][0]["name"] + if (repo_location, repo_name, job_name) not in assets_to_trigger: + assets_to_trigger[(repo_location, repo_name, job_name)] = [] + assets_to_trigger[(repo_location, repo_name, job_name)].append( + asset_node["assetKey"]["path"] + ) + print(f"Found assets to trigger: {assets_to_trigger}") # noqa: T201 + triggered_runs = [] + for (repo_location, repo_name, job_name), asset_keys in assets_to_trigger.items(): + execution_params = { + "mode": "default", + "executionMetadata": {"tags": []}, + "runConfigData": "{}", + "selector": { + "repositoryLocationName": repo_location, + "repositoryName": repo_name, + "pipelineName": job_name, + "assetSelection": [{"path": asset_key} for asset_key in asset_keys], + "assetCheckSelection": [], + }, + } + print(f"Triggering run for {repo_location}/{repo_name}/{job_name} with assets {asset_keys}") # noqa: T201 + response = requests.post( + f"{dagster_url}/graphql", + json={ + "query": TRIGGER_ASSETS_MUTATION, + "variables": {"executionParams": execution_params}, + }, + timeout=3, + ) + run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"] + print(f"Launched run {run_id}...") # noqa: T201 + triggered_runs.append(run_id) + completed_runs = {} # key is run_id, value is status + while len(completed_runs) < len(triggered_runs): + for run_id in triggered_runs: + if run_id in completed_runs: + continue + response = requests.post( + f"{dagster_url}/graphql", + json={"query": RUNS_QUERY, "variables": {"runId": run_id}}, + timeout=3, + ) + run_status = response.json()["data"]["runOrError"]["status"] + if run_status in ["SUCCESS", "FAILURE", "CANCELED"]: + print(f"Run {run_id} completed with status {run_status}") # noqa: T201 + completed_runs[run_id] = run_status + non_successful_runs = [ + run_id for run_id, status in completed_runs.items() if status != "SUCCESS" + ] + if non_successful_runs: + raise Exception(f"Runs {non_successful_runs} did not complete successfully.") + print("All runs completed successfully.") # noqa: T201 + return None + + +def build_dagster_task(task_id: str, dag: DAG, **kwargs): + return PythonOperator(task_id=task_id, dag=dag, python_callable=compute_fn, **kwargs) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py new file mode 100644 index 0000000000000..6447612204395 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py @@ -0,0 +1,104 @@ +ASSET_NODES_QUERY = """ +query AssetNodeQuery { + assetNodes { + id + assetKey { + path + } + opName + jobs { + id + name + repository { + id + name + location { + id + name + } + } + } + } +} +""" + +TRIGGER_ASSETS_MUTATION = """ +mutation LaunchAssetsExecution($executionParams: ExecutionParams!) { + launchPipelineExecution(executionParams: $executionParams) { + ... on LaunchRunSuccess { + run { + id + pipelineName + __typename + } + __typename + } + ... on PipelineNotFoundError { + message + __typename + } + ... on InvalidSubsetError { + message + __typename + } + ... on RunConfigValidationInvalid { + errors { + message + __typename + } + __typename + } + ...PythonErrorFragment + __typename + } +} + +fragment PythonErrorFragment on PythonError { + message + stack + errorChain { + ...PythonErrorChain + __typename + } + __typename +} + +fragment PythonErrorChain on ErrorChainLink { + isExplicitLink + error { + message + stack + __typename + } + __typename +} +""" + +RUNS_QUERY = """ +query RunQuery($runId: ID!) { + runOrError(runId: $runId) { + __typename + ...PythonErrorFragment + ...NotFoundFragment + ... on Run { + id + status + __typename + } + } +} +fragment NotFoundFragment on RunNotFoundError { + __typename + message +} +fragment PythonErrorFragment on PythonError { + __typename + message + stack + causes { + message + stack + __typename + } +} +""" diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py index 476eb0ca22b3c..dd297226f75f3 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py @@ -31,7 +31,7 @@ def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]: initial_time = get_current_timestamp() airflow_ready = False - while get_current_timestamp() - initial_time < 30: + while get_current_timestamp() - initial_time < 60: if airflow_is_ready(): airflow_ready = True break diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py index 781d2d5bb33dc..5509056305ee7 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py @@ -1,5 +1,7 @@ import os +import signal import subprocess +import time from pathlib import Path from tempfile import TemporaryDirectory from typing import Any, Generator @@ -7,6 +9,7 @@ import pytest import requests from dagster._core.test_utils import environ +from dagster._time import get_current_timestamp def assert_link_exists(link_name: str, link_url: Any): @@ -19,17 +22,26 @@ def default_dags_dir(): return Path(__file__).parent / "dags" -@pytest.fixture(name="setup") -def setup_fixture(dags_dir: Path) -> Generator[str, None, None]: +@pytest.fixture(name="airflow_home") +def default_airflow_home() -> Generator[str, None, None]: with TemporaryDirectory() as tmpdir: - # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir - temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} - # go up one directory from current - path_to_script = Path(__file__).parent.parent.parent / "airflow_setup.sh" - subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) - subprocess.run([path_to_script, dags_dir], check=True, env=temp_env) - with environ({"AIRFLOW_HOME": tmpdir}): - yield tmpdir + yield tmpdir + + +@pytest.fixture(name="setup") +def setup_fixture(airflow_home: Path, dags_dir: Path) -> Generator[str, None, None]: + # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir + temp_env = { + **os.environ.copy(), + "AIRFLOW_HOME": str(airflow_home), + "DAGSTER_URL": "http://localhost:3333", + } + # go up one directory from current + path_to_script = Path(__file__).parent.parent.parent / "airflow_setup.sh" + subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) + subprocess.run([path_to_script, dags_dir], check=True, env=temp_env) + with environ({"AIRFLOW_HOME": str(airflow_home), "DAGSTER_URL": "http://localhost:3333"}): + yield str(airflow_home) @pytest.fixture(name="dbt_project_dir") @@ -49,3 +61,46 @@ def dbt_project(dbt_project_dir: Path) -> None: check=True, env=os.environ.copy(), ) + + +def dagster_is_ready() -> bool: + try: + response = requests.get("http://localhost:3333") + return response.status_code == 200 + except: + return False + + +@pytest.fixture(name="dagster_home") +def setup_dagster_home() -> Generator[str, None, None]: + """Instantiate a temporary directory to serve as the DAGSTER_HOME.""" + with TemporaryDirectory() as tmpdir: + yield tmpdir + + +@pytest.fixture(name="dagster_dev") +def setup_dagster(dagster_home: str, dagster_defs_path: str) -> Generator[Any, None, None]: + """Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided + by a fixture included in the callsite. + """ + temp_env = {**os.environ.copy(), "DAGSTER_HOME": dagster_home} + process = subprocess.Popen( + ["dagster", "dev", "-f", dagster_defs_path, "-p", "3333"], + env=temp_env, + shell=False, + preexec_fn=os.setsid, # noqa + ) + # Give dagster a second to stand up + time.sleep(5) + + dagster_ready = False + initial_time = get_current_timestamp() + while get_current_timestamp() - initial_time < 60: + if dagster_is_ready(): + dagster_ready = True + break + time.sleep(1) + + assert dagster_ready, "Dagster did not start within 30 seconds..." + yield process + os.killpg(process.pid, signal.SIGKILL) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py new file mode 100644 index 0000000000000..7b6faf7eef018 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py @@ -0,0 +1,29 @@ +import logging +from datetime import datetime + +from airflow import DAG +from dagster_airlift.in_airflow.dagster_operator import build_dagster_task + +logging.basicConfig() +logging.getLogger().setLevel(logging.INFO) +requests_log = logging.getLogger("requests.packages.urllib3") +requests_log.setLevel(logging.INFO) +requests_log.propagate = True + + +def print_hello(): + print("Hello") # noqa: T201 + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 1, +} + +dag = DAG( + "the_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False +) +migrated_op = build_dagster_task(task_id="some_task", dag=dag) +other_migrated_op = build_dagster_task(task_id="other_task", dag=dag) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py new file mode 100644 index 0000000000000..91b9287203081 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py @@ -0,0 +1,19 @@ +from dagster import Definitions, asset + + +@asset +def the_dag__some_task(): + return "asset_value" + + +@asset +def unrelated(): + return "unrelated_value" + + +@asset +def the_dag__other_task(): + return "other_task_value" + + +defs = Definitions(assets=[the_dag__other_task, the_dag__some_task, unrelated]) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py new file mode 100644 index 0000000000000..c3e201b53807c --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py @@ -0,0 +1,61 @@ +import time +from pathlib import Path + +import pytest +import requests +from dagster import AssetKey, DagsterInstance, DagsterRunStatus +from dagster._core.test_utils import environ +from dagster._time import get_current_timestamp + + +@pytest.fixture(name="dags_dir") +def dags_dir() -> Path: + return Path(__file__).parent / "operator_test_project" / "dags" + + +@pytest.fixture(name="dagster_defs_path") +def dagster_defs_path() -> Path: + return Path(__file__).parent / "operator_test_project" / "dagster_defs.py" + + +def test_dagster_operator(airflow_instance: None, dagster_dev: None, dagster_home: str) -> None: + """Tests that dagster operator can correctly map airflow tasks to dagster tasks, and kick off executions.""" + response = requests.post( + "http://localhost:8080/api/v1/dags/the_dag/dagRuns", auth=("admin", "admin"), json={} + ) + assert response.status_code == 200, response.json() + # Wait until the run enters a terminal state + terminal_status = None + start_time = get_current_timestamp() + while get_current_timestamp() - start_time < 30: + response = requests.get( + "http://localhost:8080/api/v1/dags/the_dag/dagRuns", auth=("admin", "admin") + ) + assert response.status_code == 200, response.json() + dag_runs = response.json()["dag_runs"] + if dag_runs[0]["state"] in ["success", "failed"]: + terminal_status = dag_runs[0]["state"] + break + time.sleep(1) + assert terminal_status == "success", ( + "Never reached terminal status" + if terminal_status is None + else f"terminal status was {terminal_status}" + ) + with environ({"DAGSTER_HOME": dagster_home}): + instance = DagsterInstance.get() + runs = instance.get_runs() + # The graphql endpoint kicks off a run for each of the tasks in the dag + assert len(runs) == 2 + some_task_run = [ # noqa + run + for run in runs + if set(list(run.asset_selection)) == {AssetKey(["the_dag__other_task"])} # type: ignore + ][0] + other_task_run = [ # noqa + run + for run in runs + if set(list(run.asset_selection)) == {AssetKey(["the_dag__some_task"])} # type: ignore + ][0] + assert some_task_run.status == DagsterRunStatus.SUCCESS + assert other_task_run.status == DagsterRunStatus.SUCCESS diff --git a/examples/experimental/dagster-airlift/setup.py b/examples/experimental/dagster-airlift/setup.py index 623dbcf8466dd..e617d8f67b3b7 100644 --- a/examples/experimental/dagster-airlift/setup.py +++ b/examples/experimental/dagster-airlift/setup.py @@ -37,7 +37,7 @@ "in-airflow": airflow_dep_list, "mwaa": ["boto3"], "dbt": ["dagster-dbt"], - "test": ["pytest", "dagster-dbt", "dbt-duckdb", "boto3"], + "test": ["pytest", "dagster-dbt", "dbt-duckdb", "boto3", "dagster-webserver"], }, zip_safe=False, ) diff --git a/examples/experimental/dagster-airlift/tox.ini b/examples/experimental/dagster-airlift/tox.ini index ef360da0de058..6a5ef86e22ee9 100644 --- a/examples/experimental/dagster-airlift/tox.ini +++ b/examples/experimental/dagster-airlift/tox.ini @@ -12,6 +12,8 @@ deps = -e ../../../python_modules/dagster[test] -e ../../../python_modules/dagster-test -e ../../../python_modules/dagster-pipes + -e ../../../python_modules/dagster-webserver + -e ../../../python_modules/dagster-graphql -e ../../../python_modules/libraries/dagster-dbt -e .[core,mwaa,dbt,test,in-airflow] dbt-duckdb @@ -19,5 +21,6 @@ allowlist_externals = /bin/bash uv commands = + make -C ../../.. rebuild_ui !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' pytest -c ../../../pyproject.toml ./dagster_airlift_tests --snapshot-warn-unused -vv {posargs}