From c2ccdeb592ec3a4c4e1dd16dfcb80a8cccbe5d89 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Wed, 4 Sep 2024 15:52:34 -0700 Subject: [PATCH] [dagster-airlift] move mark_as_dagster_migrating tests to unit (#24168) The tests for mark_as_dagster_migrating (including checking swizzling behavior) were overkill for what they were actually doing. Can be better handled with unit testing. --- .../in_airflow/base_proxy_operator.py | 12 +++- .../dagster_airlift/migration_state.py | 7 ++ .../dags/switcheroo_dag.py | 68 ------------------ .../dagster_defs.py | 9 --- .../dags/switcheroo_dag.py | 68 ------------------ .../dagster_defs.py | 13 ---- .../correctly_marked_dag/dags/marked.py | 33 --------- .../migration_state/marked_dag.yaml | 3 - .../incorrectly_marked_dag/dags/marked.py | 33 --------- .../migration_state/some_other_dag.yaml | 3 - .../test_migrating_dag_correctly_marked.py | 39 ---------- .../test_migrating_dag_incorrectly_marked.py | 31 -------- .../test_operator_switcheroo.py | 71 ------------------- .../unit_tests/in_airflow_tests/__init__.py | 0 .../unit_tests/in_airflow_tests/conftest.py | 41 +++++++++++ .../test_mark_as_dagster_migrating.py | 47 ++++++++++++ 16 files changed, 105 insertions(+), 373 deletions(-) delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dags/switcheroo_dag.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dagster_defs.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dags/switcheroo_dag.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dagster_defs.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/dags/marked.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/migration_state/marked_dag.yaml delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/dags/marked.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/migration_state/some_other_dag.yaml delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_correctly_marked.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_incorrectly_marked.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_operator_switcheroo.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/__init__.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/conftest.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/test_mark_as_dagster_migrating.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py index 974435cdeec20..7dde41554d33c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py @@ -29,7 +29,10 @@ def _get_validated_session(self, context: Context) -> requests.Session: session = self.get_dagster_session(context) dagster_url = self.get_dagster_url(context) response = session.post( - f"{dagster_url}/graphql", json={"query": VERIFICATION_QUERY}, timeout=3 + # Timeout in seconds + f"{dagster_url}/graphql", + json={"query": VERIFICATION_QUERY}, + timeout=3, ) if response.status_code != 200: raise Exception( @@ -50,7 +53,10 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys # create graphql client response = session.post( - f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3 + # Timeout in seconds + f"{dagster_url}/graphql", + json={"query": ASSET_NODES_QUERY}, + timeout=3, ) for asset_node in response.json()["data"]["assetNodes"]: tags = {tag["key"]: tag["value"] for tag in asset_node["tags"]} @@ -90,6 +96,7 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N "query": TRIGGER_ASSETS_MUTATION, "variables": {"executionParams": execution_params}, }, + # Timeout in seconds timeout=3, ) run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"] @@ -103,6 +110,7 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N response = session.post( f"{dagster_url}/graphql", json={"query": RUNS_QUERY, "variables": {"runId": run_id}}, + # Timeout in seconds timeout=3, ) run_status = response.json()["data"]["runOrError"]["status"] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py b/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py index 423a17c87f79c..c9aad1edbf437 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py @@ -66,6 +66,13 @@ def get_migration_dict_for_dag( ] } + @staticmethod + def from_dict(migration_dict: Dict[str, Any]) -> "AirflowMigrationState": + dags = {} + for dag_id, dag_dict in migration_dict.items(): + dags[dag_id] = DagMigrationState.from_dict(dag_dict) + return AirflowMigrationState(dags=dags) + class MigrationStateParsingError(Exception): pass diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dags/switcheroo_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dags/switcheroo_dag.py deleted file mode 100644 index 65c59ccb91aaf..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dags/switcheroo_dag.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging -import os -from datetime import datetime - -from airflow import DAG -from airflow.operators.python import PythonOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import ( - AirflowMigrationState, - DagMigrationState, - TaskMigrationState, -) - -logging.basicConfig() -logging.getLogger().setLevel(logging.INFO) -requests_log = logging.getLogger("requests.packages.urllib3") -requests_log.setLevel(logging.INFO) -requests_log.propagate = True - - -def write_to_file_in_airflow_home() -> None: - airflow_home = os.environ["AIRFLOW_HOME"] - with open(os.path.join(airflow_home, "airflow_home_file.txt"), "w") as f: - f.write("Hello") - - -def write_to_other_file_in_airflow_home() -> None: - airflow_home = os.environ["AIRFLOW_HOME"] - with open(os.path.join(airflow_home, "other_airflow_home_file.txt"), "w") as f: - f.write("Hello") - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -dag = DAG( - "the_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -op_to_migrate = PythonOperator( - task_id="some_task", python_callable=write_to_file_in_airflow_home, dag=dag -) -op_doesnt_migrate = PythonOperator( - task_id="other_task", python_callable=write_to_other_file_in_airflow_home, dag=dag -) -# Add a dependency between the two tasks -op_doesnt_migrate.set_upstream(op_to_migrate) - -# # set up the debugger -# print("Waiting for debugger to attach...") -# debugpy.listen(("localhost", 7778)) -# debugpy.wait_for_client() -mark_as_dagster_migrating( - global_vars=globals(), - migration_state=AirflowMigrationState( - dags={ - "the_dag": DagMigrationState( - tasks={ - "some_task": TaskMigrationState(task_id="some_task", migrated=True), - "other_task": TaskMigrationState(task_id="other_task", migrated=True), - } - ) - } - ), -) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dagster_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dagster_defs.py deleted file mode 100644 index a2def0e909262..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_convention/dagster_defs.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import Definitions, asset - - -@asset -def the_dag__some_task(): - return "asset_value" - - -defs = Definitions(assets=[the_dag__some_task]) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dags/switcheroo_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dags/switcheroo_dag.py deleted file mode 100644 index 65c59ccb91aaf..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dags/switcheroo_dag.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging -import os -from datetime import datetime - -from airflow import DAG -from airflow.operators.python import PythonOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import ( - AirflowMigrationState, - DagMigrationState, - TaskMigrationState, -) - -logging.basicConfig() -logging.getLogger().setLevel(logging.INFO) -requests_log = logging.getLogger("requests.packages.urllib3") -requests_log.setLevel(logging.INFO) -requests_log.propagate = True - - -def write_to_file_in_airflow_home() -> None: - airflow_home = os.environ["AIRFLOW_HOME"] - with open(os.path.join(airflow_home, "airflow_home_file.txt"), "w") as f: - f.write("Hello") - - -def write_to_other_file_in_airflow_home() -> None: - airflow_home = os.environ["AIRFLOW_HOME"] - with open(os.path.join(airflow_home, "other_airflow_home_file.txt"), "w") as f: - f.write("Hello") - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -dag = DAG( - "the_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -op_to_migrate = PythonOperator( - task_id="some_task", python_callable=write_to_file_in_airflow_home, dag=dag -) -op_doesnt_migrate = PythonOperator( - task_id="other_task", python_callable=write_to_other_file_in_airflow_home, dag=dag -) -# Add a dependency between the two tasks -op_doesnt_migrate.set_upstream(op_to_migrate) - -# # set up the debugger -# print("Waiting for debugger to attach...") -# debugpy.listen(("localhost", 7778)) -# debugpy.wait_for_client() -mark_as_dagster_migrating( - global_vars=globals(), - migration_state=AirflowMigrationState( - dags={ - "the_dag": DagMigrationState( - tasks={ - "some_task": TaskMigrationState(task_id="some_task", migrated=True), - "other_task": TaskMigrationState(task_id="other_task", migrated=True), - } - ) - } - ), -) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dagster_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dagster_defs.py deleted file mode 100644 index 0f4b03261e96d..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/airflow_op_switcheroo_tags/dagster_defs.py +++ /dev/null @@ -1,13 +0,0 @@ -from dagster import Definitions, asset -from dagster_airlift.core import dag_defs, task_defs - - -@asset -def my_asset_for_some_task(): - return "asset_value" - - -defs = dag_defs( - "the_dag", - task_defs("some_task", Definitions(assets=[my_asset_for_some_task])), -) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/dags/marked.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/dags/marked.py deleted file mode 100644 index 9d600d2d9496a..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/dags/marked.py +++ /dev/null @@ -1,33 +0,0 @@ -from datetime import datetime -from pathlib import Path - -from airflow import DAG -from airflow.operators.python import PythonOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import load_migration_state_from_yaml - - -def print_hello(): - print("Hello") # noqa: T201 - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -marked_dag = DAG( - "marked_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=marked_dag) -downstream_print_op = PythonOperator( - task_id="downstream_print_task", python_callable=print_hello, dag=marked_dag -) - - -path_to_migration_state = Path(__file__).parent.parent / "migration_state" -mark_as_dagster_migrating( - migration_state=load_migration_state_from_yaml(path_to_migration_state), global_vars=globals() -) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/migration_state/marked_dag.yaml b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/migration_state/marked_dag.yaml deleted file mode 100644 index 3ddf2a77f3d04..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/correctly_marked_dag/migration_state/marked_dag.yaml +++ /dev/null @@ -1,3 +0,0 @@ -tasks: - - id: print_task - migrated: False \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/dags/marked.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/dags/marked.py deleted file mode 100644 index 51c6781cfdad7..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/dags/marked.py +++ /dev/null @@ -1,33 +0,0 @@ -from datetime import datetime -from pathlib import Path - -from airflow import DAG -from airflow.operators.python import PythonOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import load_migration_state_from_yaml - - -def print_hello(): - print("Hello") # noqa: T201 - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -marked_dag = DAG( - "marked_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=marked_dag) -downstream_print_op = PythonOperator( - task_id="downstream_print_task", python_callable=print_hello, dag=marked_dag -) - -# There is no entry for marked_dag in the migration state directory. There shouldn't be an exception, the dag just shouldn't be marked. -path_to_migration_state = Path(__file__).parent.parent / "migration_state" -mark_as_dagster_migrating( - migration_state=load_migration_state_from_yaml(path_to_migration_state), global_vars=globals() -) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/migration_state/some_other_dag.yaml b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/migration_state/some_other_dag.yaml deleted file mode 100644 index 3ddf2a77f3d04..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/incorrectly_marked_dag/migration_state/some_other_dag.yaml +++ /dev/null @@ -1,3 +0,0 @@ -tasks: - - id: print_task - migrated: False \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_correctly_marked.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_correctly_marked.py deleted file mode 100644 index a8fc29f22ccba..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_correctly_marked.py +++ /dev/null @@ -1,39 +0,0 @@ -import json -import os -import subprocess -from pathlib import Path -from tempfile import TemporaryDirectory -from typing import Generator - -import pytest -import requests -from dagster._core.test_utils import environ - - -@pytest.fixture(name="setup") -def setup_fixture() -> Generator[str, None, None]: - with TemporaryDirectory() as tmpdir: - # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir - temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} - # go up one directory from current - path_to_script = Path(__file__).parent.parent.parent / "scripts" / "airflow_setup.sh" - path_to_dags = Path(__file__).parent / "correctly_marked_dag" / "dags" - subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) - subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env) - with environ({"AIRFLOW_HOME": tmpdir}): - yield tmpdir - - -def test_migrating_dag(airflow_instance: None) -> None: - """Tests that a correctly marked dag is marked as migrating via a tag on the dag object.""" - response = requests.get("http://localhost:8080/api/v1/dags/marked_dag", auth=("admin", "admin")) - assert response.status_code == 200 - tags = response.json()["tags"] - assert len(tags) == 1 - assert json.loads(tags[0]["name"]) == { - "DAGSTER_MIGRATION_STATUS": { - "tasks": [ - {"id": "print_task", "migrated": False}, - ] - } - } diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_incorrectly_marked.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_incorrectly_marked.py deleted file mode 100644 index 8884ecbedbfbe..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_migrating_dag_incorrectly_marked.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import subprocess -from pathlib import Path -from tempfile import TemporaryDirectory -from typing import Generator - -import pytest -import requests -from dagster._core.test_utils import environ - - -@pytest.fixture(name="setup") -def setup_fixture() -> Generator[str, None, None]: - with TemporaryDirectory() as tmpdir: - # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir - temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} - # go up one directory from current - path_to_script = Path(__file__).parent.parent.parent / "scripts" / "airflow_setup.sh" - path_to_dags = Path(__file__).parent / "incorrectly_marked_dag" / "dags" - subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) - subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env) - with environ({"AIRFLOW_HOME": tmpdir}): - yield tmpdir - - -def test_migrating_dag(airflow_instance: None) -> None: - """Tests that an incorrectly marked dag throws an exception, and is not loaded.""" - response = requests.get("http://localhost:8080/api/v1/dags/marked_dag", auth=("admin", "admin")) - assert response.status_code == 200 - tags = response.json()["tags"] - assert len(tags) == 0 diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_operator_switcheroo.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_operator_switcheroo.py deleted file mode 100644 index 1dd3731f69418..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_operator_switcheroo.py +++ /dev/null @@ -1,71 +0,0 @@ -import time -from pathlib import Path - -import pytest -import requests -from dagster import AssetKey, DagsterInstance, DagsterRunStatus -from dagster._time import get_current_timestamp - - -# Two different test targets -# The first uses convention-based binding of assets to tasks, e.g. -# op named the_dag__some_task -# The second uses `dag_defs` and `task_defs` to attach tags to assets, which -# in turn are used to bind assets to tasks. -@pytest.fixture( - name="test_dir", - params=[ - "airflow_op_switcheroo_convention", - "airflow_op_switcheroo_tags", - ], -) -def test_dir_fixture(request: pytest.FixtureRequest) -> Path: - return Path(__file__).parent / request.param - - -@pytest.fixture(name="dags_dir") -def setup_dags_dir(test_dir: Path) -> Path: - return test_dir / "dags" - - -@pytest.fixture(name="dagster_defs_path") -def setup_dagster_defs_path(test_dir: Path) -> str: - return str(test_dir / "dagster_defs.py") - - -def test_migrated_operator(airflow_instance: None, dagster_dev: None) -> None: - """Tests that dagster migrated operator can correctly map airflow tasks to dagster tasks, and kick off executions.""" - response = requests.post( - "http://localhost:8080/api/v1/dags/the_dag/dagRuns", auth=("admin", "admin"), json={} - ) - assert response.status_code == 200, response.json() - # Wait until the run enters a terminal state - terminal_status = None - start_time = get_current_timestamp() - while get_current_timestamp() - start_time < 30: - response = requests.get( - "http://localhost:8080/api/v1/dags/the_dag/dagRuns", auth=("admin", "admin") - ) - assert response.status_code == 200, response.json() - dag_runs = response.json()["dag_runs"] - if dag_runs[0]["state"] in ["success", "failed"]: - terminal_status = dag_runs[0]["state"] - break - time.sleep(1) - assert terminal_status == "success", ( - "Never reached terminal status" - if terminal_status is None - else f"terminal status was {terminal_status}" - ) - # DAGSTER_HOME already set in environment, so instance should be retrievable. - instance = DagsterInstance.get() - runs = instance.get_runs() - # The graphql endpoint kicks off a run for each of the tasks in the dag - assert len(runs) == 1 - some_task_run = [ # noqa - run - for run in runs - if set(list(run.asset_selection)) == {AssetKey(["the_dag__some_task"])} # type: ignore - or set(list(run.asset_selection)) == {AssetKey(["my_asset_for_some_task"])} # type: ignore - ][0] - assert some_task_run.status == DagsterRunStatus.SUCCESS diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/conftest.py new file mode 100644 index 0000000000000..a981eb8350433 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/conftest.py @@ -0,0 +1,41 @@ +from collections import defaultdict +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List + +from airflow import DAG +from airflow.operators.python import PythonOperator + +if TYPE_CHECKING: + from airflow.models.operator import BaseOperator + + +def build_dags_dict_given_structure(structure: Dict[str, Dict[str, List[str]]]) -> Dict[str, DAG]: + """Given a structure of dags and their tasks, build a dictionary of dags.""" + return_dict = {} + tasks_per_dag: Dict[str, Dict[str, "BaseOperator"]] = defaultdict(dict) + for dag_id, task_structure in structure.items(): + dag = DAG( + dag_id, + default_args={ + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 1, + }, + schedule_interval=None, + is_paused_upon_creation=False, + ) + for task_id in task_structure.keys(): + tasks_per_dag[dag_id][task_id] = PythonOperator( + task_id=task_id, + python_callable=lambda: None, + dag=dag, + ) + return_dict[dag_id] = dag + + # Do another pass to build out dependencies + for dag_id, task_structure in structure.items(): + for task_id, deps in task_structure.items(): + for dep in deps: + tasks_per_dag[dag_id][task_id].set_upstream(tasks_per_dag[dag_id][dep]) + return return_dict diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/test_mark_as_dagster_migrating.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/test_mark_as_dagster_migrating.py new file mode 100644 index 0000000000000..e8aed13c08911 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/in_airflow_tests/test_mark_as_dagster_migrating.py @@ -0,0 +1,47 @@ +import json + +from airflow.operators.python import PythonOperator +from dagster_airlift.in_airflow import mark_as_dagster_migrating +from dagster_airlift.in_airflow.base_proxy_operator import BaseProxyToDagsterOperator +from dagster_airlift.migration_state import AirflowMigrationState + +from dagster_airlift_tests.unit_tests.in_airflow_tests.conftest import ( + build_dags_dict_given_structure, +) + + +def test_mark_as_dagster_migrating() -> None: + """Test that we can mark a set of dags as migrating to dagster, and as a result, operators are replaced and tags are added.""" + globals_fake = build_dags_dict_given_structure( + { + "task_is_migrated": {"task": []}, + "task_isnt_migrated": {"task": []}, + "should_be_ignored": {"task": []}, + } + ) + mark_as_dagster_migrating( + global_vars=globals_fake, + migration_state=AirflowMigrationState.from_dict( + { + "task_is_migrated": {"tasks": [{"id": "task", "migrated": True}]}, + "task_isnt_migrated": {"tasks": [{"id": "task", "migrated": False}]}, + } + ), + ) + # Only the task marked as migrated should be replaced with a DagsterOperator + assert isinstance( + globals_fake["task_is_migrated"].task_dict["task"], BaseProxyToDagsterOperator + ) + assert isinstance(globals_fake["task_isnt_migrated"].task_dict["task"], PythonOperator) + assert isinstance(globals_fake["should_be_ignored"].task_dict["task"], PythonOperator) + + # Only task_is_migrated and task_isnt_migrated should have tags added. + assert len(globals_fake["task_is_migrated"].tags) == 1 + assert len(globals_fake["task_isnt_migrated"].tags) == 1 + assert len(globals_fake["should_be_ignored"].tags) == 0 + assert json.loads(next(iter(globals_fake["task_is_migrated"].tags))) == { + "DAGSTER_MIGRATION_STATUS": {"tasks": [{"id": "task", "migrated": True}]} + } + assert json.loads(next(iter(globals_fake["task_isnt_migrated"].tags))) == { + "DAGSTER_MIGRATION_STATUS": {"tasks": [{"id": "task", "migrated": False}]} + }