From e61e2b41f83b0cfc47335f269255c7f02ff23599 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 29 Jul 2024 16:16:20 -0700 Subject: [PATCH] [dagster-airlift] fix tests --- ...create_airflow_cfg.sh => airflow_setup.sh} | 12 +++- .../experimental/dagster-airlift/conftest.py | 1 + .../dagster_airlift/shared_fixtures.py | 43 +++++++++++++++ .../dagster_airlift_tests/conftest.py | 55 +++---------------- .../test_airflow_utils.py | 1 - .../examples/peering-with-dbt/Makefile | 12 +--- .../examples/peering-with-dbt/conftest.py | 1 + .../peering_with_dbt_tests/conftest.py | 21 +------ 8 files changed, 66 insertions(+), 80 deletions(-) rename examples/experimental/dagster-airlift/{create_airflow_cfg.sh => airflow_setup.sh} (70%) create mode 100644 examples/experimental/dagster-airlift/conftest.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py create mode 100644 examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py diff --git a/examples/experimental/dagster-airlift/create_airflow_cfg.sh b/examples/experimental/dagster-airlift/airflow_setup.sh similarity index 70% rename from examples/experimental/dagster-airlift/create_airflow_cfg.sh rename to examples/experimental/dagster-airlift/airflow_setup.sh index 29097fab0e76b..0312e5ddd5d82 100755 --- a/examples/experimental/dagster-airlift/create_airflow_cfg.sh +++ b/examples/experimental/dagster-airlift/airflow_setup.sh @@ -24,4 +24,14 @@ auth_backend = airflow.api.auth.backend.basic_auth [webserver] expose_config = True -EOL \ No newline at end of file +EOL + +# call airflow command to create the default user +airflow db migrate && \ +airflow users create \ + --username admin \ +--password admin \ + --firstname Peter \ + --lastname Parker \ + --role Admin \ + --email spiderman@superhero.org \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/conftest.py b/examples/experimental/dagster-airlift/conftest.py new file mode 100644 index 0000000000000..c6cfe053045be --- /dev/null +++ b/examples/experimental/dagster-airlift/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_airlift.shared_fixtures"] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py new file mode 100644 index 0000000000000..476eb0ca22b3c --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py @@ -0,0 +1,43 @@ +import os +import signal +import subprocess +import time +from typing import Any, Generator + +import pytest +import requests +from dagster._time import get_current_timestamp + + +def airflow_is_ready(): + try: + response = requests.get("http://localhost:8080") + return response.status_code == 200 + except: + return False + + +# Setup should have set AIRFLOW_HOME env var +@pytest.fixture(name="airflow_instance") +def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]: + process = subprocess.Popen( + ["airflow", "standalone"], + env=os.environ, # since we have some temp vars in the env right now + shell=False, + preexec_fn=os.setsid, # noqa # fuck it we ball + ) + # Give airflow a second to stand up + time.sleep(5) + initial_time = get_current_timestamp() + + airflow_ready = False + while get_current_timestamp() - initial_time < 30: + if airflow_is_ready(): + airflow_ready = True + break + time.sleep(1) + + assert airflow_ready, "Airflow did not start within 30 seconds..." + yield process + # Kill process group, since process.kill and process.terminate do not work. + os.killpg(process.pid, signal.SIGKILL) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py index c4f471201ea68..6ab2d0c7fc9e5 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py @@ -1,65 +1,24 @@ import os -import signal import subprocess from tempfile import TemporaryDirectory -from typing import Any, Generator +from typing import Generator import pytest +from dagster._core.test_utils import environ -@pytest.fixture(name="airflow_home_dir") -def airflow_home_dir_fixture() -> Generator[str, None, None]: +@pytest.fixture(name="setup") +def setup_fixture() -> Generator[str, None, None]: with TemporaryDirectory() as tmpdir: # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} # go up one directory from current - path_to_script = os.path.join(os.path.dirname(__file__), "..", "create_airflow_cfg.sh") + path_to_script = os.path.join(os.path.dirname(__file__), "..", "airflow_setup.sh") path_to_dags = os.path.join(os.path.dirname(__file__), "airflow_project", "dags") subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env) - yield tmpdir - - -@pytest.fixture(name="airflow_instance") -def airflow_instance_fixture(airflow_home_dir: str) -> Generator[Any, None, None]: - temp_env = {**os.environ.copy(), "AIRFLOW_HOME": airflow_home_dir} - subprocess.run(["airflow", "db", "migrate"], check=True, env=temp_env) - subprocess.run( - [ - "airflow", - "users", - "create", - "--username", - "admin", - "--firstname", - "admin", - "--lastname", - "admin", - "--role", - "Admin", - "--email", - "foo@bar.com", - "--password", - "admin", - ], - check=True, - env=temp_env, - ) - process = subprocess.Popen( - ["airflow", "standalone"], - env=temp_env, - shell=False, - stdout=subprocess.PIPE, - preexec_fn=os.setsid, # noqa # fuck it we ball - ) - assert process.stdout is not None - for line in process.stdout: - if "Airflow is ready" in line.decode(): - break - yield process - # Kill process group, since process.kill and process.terminate do not work. - os.killpg(process.pid, signal.SIGKILL) - process.wait() + with environ({"AIRFLOW_HOME": tmpdir}): + yield tmpdir @pytest.fixture(name="dbt_project_dir") diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py index 22c50c665c4f6..ce47c1853dfc7 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py @@ -14,7 +14,6 @@ def test_dag_peering( airflow_instance: None, - airflow_home_dir: str, ) -> None: """Test that dags can be correctly peered from airflow, and certain metadata properties are retained.""" assets_defs = assets_defs_from_airflow_instance( diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile b/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile index f9ea2edc823af..11739a11e03d0 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile @@ -27,16 +27,8 @@ setup_local_env: make wipe && \ mkdir -p $$AIRFLOW_HOME && \ mkdir -p $$DAGSTER_HOME && \ - chmod +x ../../create_airflow_cfg.sh && \ - ../../create_airflow_cfg.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \ - airflow db migrate && \ - airflow users create \ - --username admin \ - --password admin \ - --firstname Peter \ - --lastname Parker \ - --role Admin \ - --email spiderman@superhero.org && \ + chmod +x ../../airflow_setup.sh && \ + ../../airflow_setup.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \ make dbt_setup run_airflow: diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py b/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py new file mode 100644 index 0000000000000..c6cfe053045be --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_airlift.shared_fixtures"] diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py b/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py index 42d8bdea6f3a7..934c1176688c1 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py @@ -1,7 +1,6 @@ import os -import signal import subprocess -from typing import Any, Generator +from typing import Generator import pytest from dagster._core.test_utils import environ @@ -19,21 +18,3 @@ def setup_fixture() -> Generator[None, None, None]: ): yield subprocess.run(["make", "wipe"], check=True) - - -@pytest.fixture(name="airflow_instance") -def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]: - process = subprocess.Popen( - ["airflow", "standalone"], - env=os.environ, # since we have some temp vars in the env right now - shell=False, - stdout=subprocess.PIPE, - preexec_fn=os.setsid, # noqa # fuck it we ball - ) - assert process.stdout is not None - for line in process.stdout: - if "Airflow is ready" in line.decode(): - break - yield process - # Kill process group, since process.kill and process.terminate do not work. - os.killpg(process.pid, signal.SIGKILL)