Skip to content

Commit

Permalink
[dagster-airlift] improve unit test speed (#23324)
Browse files Browse the repository at this point in the history
Moves shared implementation to a shared fixture directory, and switches
how we determine when airflow is stood up. This speeds up tests to take
a few seconds instead of >20 minutes.
  • Loading branch information
dpeng817 authored Aug 3, 2024
1 parent 5ed3b9d commit d792196
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ auth_backend = airflow.api.auth.backend.basic_auth
[webserver]
expose_config = True
EOL
EOL

# call airflow command to create the default user
airflow db migrate && \
airflow users create \
--username admin \
--password admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
1 change: 1 addition & 0 deletions examples/experimental/dagster-airlift/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_airlift.shared_fixtures"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import signal
import subprocess
import time
from typing import Any, Generator

import pytest
import requests
from dagster._time import get_current_timestamp


def airflow_is_ready():
try:
response = requests.get("http://localhost:8080")
return response.status_code == 200
except:
return False


# Setup should have set AIRFLOW_HOME env var
@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]:
process = subprocess.Popen(
["airflow", "standalone"],
env=os.environ, # since we have some temp vars in the env right now
shell=False,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
# Give airflow a second to stand up
time.sleep(5)
initial_time = get_current_timestamp()

airflow_ready = False
while get_current_timestamp() - initial_time < 30:
if airflow_is_ready():
airflow_ready = True
break
time.sleep(1)

assert airflow_ready, "Airflow did not start within 30 seconds..."
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)
Original file line number Diff line number Diff line change
@@ -1,65 +1,24 @@
import os
import signal
import subprocess
from tempfile import TemporaryDirectory
from typing import Any, Generator
from typing import Generator

import pytest
from dagster._core.test_utils import environ


@pytest.fixture(name="airflow_home_dir")
def airflow_home_dir_fixture() -> Generator[str, None, None]:
@pytest.fixture(name="setup")
def setup_fixture() -> Generator[str, None, None]:
with TemporaryDirectory() as tmpdir:
# run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir
temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir}
# go up one directory from current
path_to_script = os.path.join(os.path.dirname(__file__), "..", "create_airflow_cfg.sh")
path_to_script = os.path.join(os.path.dirname(__file__), "..", "airflow_setup.sh")
path_to_dags = os.path.join(os.path.dirname(__file__), "airflow_project", "dags")
subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env)
subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env)
yield tmpdir


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(airflow_home_dir: str) -> Generator[Any, None, None]:
temp_env = {**os.environ.copy(), "AIRFLOW_HOME": airflow_home_dir}
subprocess.run(["airflow", "db", "migrate"], check=True, env=temp_env)
subprocess.run(
[
"airflow",
"users",
"create",
"--username",
"admin",
"--firstname",
"admin",
"--lastname",
"admin",
"--role",
"Admin",
"--email",
"[email protected]",
"--password",
"admin",
],
check=True,
env=temp_env,
)
process = subprocess.Popen(
["airflow", "standalone"],
env=temp_env,
shell=False,
stdout=subprocess.PIPE,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
assert process.stdout is not None
for line in process.stdout:
if "Airflow is ready" in line.decode():
break
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)
process.wait()
with environ({"AIRFLOW_HOME": tmpdir}):
yield tmpdir


@pytest.fixture(name="dbt_project_dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

def test_dag_peering(
airflow_instance: None,
airflow_home_dir: str,
) -> None:
"""Test that dags can be correctly peered from airflow, and certain metadata properties are retained."""
assets_defs = assets_defs_from_airflow_instance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,8 @@ setup_local_env:
make wipe && \
mkdir -p $$AIRFLOW_HOME && \
mkdir -p $$DAGSTER_HOME && \
chmod +x ../../create_airflow_cfg.sh && \
../../create_airflow_cfg.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \
airflow db migrate && \
airflow users create \
--username admin \
--password admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected] && \
chmod +x ../../airflow_setup.sh && \
../../airflow_setup.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \
make dbt_setup

run_airflow:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_airlift.shared_fixtures"]
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import signal
import subprocess
from typing import Any, Generator
from typing import Generator

import pytest
from dagster._core.test_utils import environ
Expand All @@ -19,21 +18,3 @@ def setup_fixture() -> Generator[None, None, None]:
):
yield
subprocess.run(["make", "wipe"], check=True)


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]:
process = subprocess.Popen(
["airflow", "standalone"],
env=os.environ, # since we have some temp vars in the env right now
shell=False,
stdout=subprocess.PIPE,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
assert process.stdout is not None
for line in process.stdout:
if "Airflow is ready" in line.decode():
break
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)

0 comments on commit d792196

Please sign in to comment.