From 5de01b29719d00326d9bdb38d83c9ce0279d5cdf Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Fri, 13 Sep 2024 09:46:23 -0700 Subject: [PATCH] [dagster-airlift] better dbt example tests (#24447) DBT example tests were insufficient. Follow example of perf harness and improve testing. --- .../dagster_airlift/test/shared_fixtures.py | 14 ++- .../dbt_example/airflow_dags/dags.py | 11 ++- .../integration_tests/conftest.py | 28 ++++-- .../integration_tests/test_load_dagster.py | 94 +++++++++++++++++-- .../examples/dbt-example/tox.ini | 2 + .../perf_harness_tests/test_e2e.py | 17 ++-- 6 files changed, 133 insertions(+), 33 deletions(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py index 56cafcc6d0029..8fcbb6d6d9bb3 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py @@ -118,21 +118,19 @@ def setup_dagster_home() -> Generator[str, None, None]: yield tmpdir -@pytest.fixture(name="dagster_defs_type") -def dagster_defs_file_type() -> str: - """Return the type of file that contains the dagster definitions.""" - return "-f" +@pytest.fixture(name="dagster_dev_cmd") +def dagster_dev_cmd(dagster_defs_path: str) -> List[str]: + """Return the command used to stand up dagster dev.""" + return ["dagster", "dev", "-f", dagster_defs_path, "-p", "3333"] @pytest.fixture(name="dagster_dev") -def setup_dagster( - dagster_home: str, dagster_defs_path: str, dagster_defs_type: str -) -> Generator[Any, None, None]: +def setup_dagster(dagster_home: str, dagster_dev_cmd: List[str]) -> Generator[Any, None, None]: """Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided by a fixture included in the callsite. """ process = subprocess.Popen( - ["dagster", "dev", dagster_defs_type, dagster_defs_path, "-p", "3333"], + dagster_dev_cmd, env=os.environ.copy(), shell=False, preexec_fn=os.setsid, # noqa diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py index 1f4454dded617..2d8d080b5690e 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/airflow_dags/dags.py @@ -36,13 +36,20 @@ def execute(self, context) -> None: DBT_DIR = os.getenv("DBT_PROJECT_DIR") # Create the DAG with the specified schedule interval -dbt_dag = DAG("dbt_dag", default_args=default_args, schedule_interval=None) +dbt_dag = DAG( + "dbt_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False +) args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}" run_dbt_model = BashOperator( task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dbt_dag ) -dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None) +dag = DAG( + "load_lakehouse", + default_args=default_args, + schedule_interval=None, + is_paused_upon_creation=False, +) load_iris = LoadToLakehouseOperator( task_id="load_iris", dag=dag, diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/conftest.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/conftest.py index 633d43dc3de73..17509bd15c386 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/conftest.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/conftest.py @@ -5,28 +5,40 @@ import pytest from dagster._core.test_utils import environ +from dagster_airlift.test.shared_fixtures import stand_up_airflow + + +def makefile_dir() -> Path: + return Path(__file__).parent.parent.parent @pytest.fixture(name="local_env") def local_env_fixture() -> Generator[None, None, None]: - makefile_dir = Path(__file__).parent.parent.parent - subprocess.run(["make", "setup_local_env"], cwd=makefile_dir, check=True) + subprocess.run(["make", "setup_local_env"], cwd=makefile_dir(), check=True) with environ( { - "AIRFLOW_HOME": str(makefile_dir / ".airflow_home"), - "DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "shared" / "dbt"), - "DAGSTER_HOME": str(makefile_dir / ".dagster_home"), + "AIRFLOW_HOME": str(makefile_dir() / ".airflow_home"), + "DBT_PROJECT_DIR": str(makefile_dir() / "dbt_example" / "shared" / "dbt"), + "DAGSTER_HOME": str(makefile_dir() / ".dagster_home"), } ): yield - subprocess.run(["make", "wipe"], cwd=makefile_dir, check=True) + subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True) @pytest.fixture(name="dags_dir") def dags_dir_fixture() -> Path: - return Path(__file__).parent.parent / "dbt_example" / "airflow_dags" + return makefile_dir() / "dbt_example" / "airflow_dags" @pytest.fixture(name="airflow_home") -def airflow_home_fixture(local_env) -> Path: +def airflow_home_fixture(local_env: None) -> Path: return Path(os.environ["AIRFLOW_HOME"]) + + +@pytest.fixture(name="airflow_instance") +def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]: + with stand_up_airflow( + airflow_cmd=["make", "run_airflow"], env=os.environ, cwd=makefile_dir() + ) as process: + yield process diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_load_dagster.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_load_dagster.py index 08ce333685077..9cd5ffe01b3d9 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_load_dagster.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_load_dagster.py @@ -1,10 +1,90 @@ -def test_defs_loads(airflow_instance) -> None: - from dbt_example.dagster_defs.peer import defs +import os +from datetime import timedelta +from typing import Callable, List, Tuple - assert defs - from dbt_example.dagster_defs.observe import defs +import pytest +from dagster import AssetKey, DagsterInstance +from dagster._time import get_current_datetime +from dagster_airlift.core import AirflowInstance - assert defs - from dbt_example.dagster_defs.migrate import defs +from dbt_example_tests.integration_tests.conftest import makefile_dir - assert defs + +@pytest.fixture(name="dagster_home") +def dagster_home_fixture(local_env: None) -> str: + return os.environ["DAGSTER_HOME"] + + +@pytest.fixture(name="stage_and_fn") +def stage_and_fn_fixture(request) -> Tuple[str, Callable[[], AirflowInstance]]: + return request.param + + +@pytest.fixture(name="dagster_dev_cmd") +def dagster_dev_cmd_fixture(stage_and_fn: Tuple[str, Callable[[], AirflowInstance]]) -> List[str]: + dagster_dev_module = stage_and_fn[0] + if dagster_dev_module.endswith("peer"): + cmd = ["make", "run_peer"] + elif dagster_dev_module.endswith("observe"): + cmd = ["make", "run_observe"] + else: + cmd = ["make", "run_migrate"] + return cmd + ["-C", str(makefile_dir())] + + +def peer_instance() -> AirflowInstance: + from dbt_example.dagster_defs.peer import airflow_instance as af_instance_peer + + return af_instance_peer + + +def observe_instance() -> AirflowInstance: + from dbt_example.dagster_defs.observe import airflow_instance as af_instance_observe + + return af_instance_observe + + +def migrate_instance() -> AirflowInstance: + from dbt_example.dagster_defs.migrate import airflow_instance as af_instance_migrate + + return af_instance_migrate + + +@pytest.mark.parametrize( + "stage_and_fn", + [ + ("peer", peer_instance), + ("observe", observe_instance), + ("migrate", migrate_instance), + ], + ids=["peer", "observe", "migrate"], + indirect=True, +) +def test_dagster_materializes( + airflow_instance: None, + dagster_dev: None, + dagster_home: str, + stage_and_fn: Tuple[str, Callable[[], AirflowInstance]], +) -> None: + """Test that assets can load properly, and that materializations register.""" + dagster_dev_module, af_instance_fn = stage_and_fn + af_instance = af_instance_fn() + for dag_id, expected_asset_key in [("load_lakehouse", AssetKey(["lakehouse", "iris"]))]: + run_id = af_instance.trigger_dag("load_lakehouse") + af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id, timeout=60) + dagster_instance = DagsterInstance.get() + start_time = get_current_datetime() + while get_current_datetime() - start_time < timedelta(seconds=30): + asset_materialization = dagster_instance.get_latest_materialization_event( + asset_key=AssetKey(["airflow_instance", "dag", dag_id]) + ) + if asset_materialization: + break + + assert asset_materialization + + if dagster_dev_module.endswith("observe") or dagster_dev_module.endswith("migrate"): + asset_materialization = dagster_instance.get_latest_materialization_event( + asset_key=expected_asset_key + ) + assert asset_materialization diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini b/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini index edeee2d82869b..19bb9dda69b87 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini +++ b/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini @@ -22,5 +22,7 @@ allowlist_externals = /bin/bash uv commands = + # We need to rebuild the UI to ensure that the dagster-webserver can run + make -C ../../../../.. rebuild_ui !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' pytest -c ../../../../../pyproject.toml ./dbt_example_tests --snapshot-warn-unused -vv {posargs} \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py index f378b778e13e7..e26575a5bd794 100644 --- a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py +++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py @@ -1,5 +1,6 @@ import os from datetime import timedelta +from typing import List import pytest from dagster import AssetKey, DagsterInstance @@ -12,18 +13,18 @@ def dagster_home_fixture(local_env: None) -> str: return os.environ["DAGSTER_HOME"] -@pytest.fixture(name="dagster_defs_path") -def dagster_defs_path_fixture(request) -> str: +@pytest.fixture(name="dagster_dev_module") +def dagster_dev_module_fixture(request) -> str: return request.param -@pytest.fixture(name="dagster_defs_type") -def dagster_defs_type_fixture() -> str: - return "-m" +@pytest.fixture(name="dagster_dev_cmd") +def dagster_dev_cmd_fixture(dagster_dev_module: str) -> List[str]: + return ["dagster", "dev", "-m", dagster_dev_module, "-p", "3333"] @pytest.mark.parametrize( - "dagster_defs_path", + "dagster_dev_module", [ "perf_harness.dagster_defs.peer", "perf_harness.dagster_defs.observe", @@ -33,7 +34,7 @@ def dagster_defs_type_fixture() -> str: indirect=True, ) def test_dagster_materializes( - airflow_instance: None, dagster_dev: None, dagster_home: str, dagster_defs_path: str + airflow_instance: None, dagster_dev: None, dagster_home: str, dagster_dev_module: str ) -> None: """Test that assets can load properly, and that materializations register.""" run_id = af_instance.trigger_dag("dag_0") @@ -49,7 +50,7 @@ def test_dagster_materializes( assert asset_materialization - if dagster_defs_path.endswith("observe") or dagster_defs_path.endswith("migrate"): + if dagster_dev_module.endswith("observe") or dagster_dev_module.endswith("migrate"): asset_materialization = dagster_instance.get_latest_materialization_event( asset_key=AssetKey(["asset_0_0"]) )