Skip to content

Commit

Permalink
[dagster-airlift] better dbt example tests (#24447)
Browse files Browse the repository at this point in the history
DBT example tests were insufficient. Follow example of perf harness and
improve testing.
  • Loading branch information
dpeng817 authored Sep 13, 2024
1 parent 19f73c2 commit 5de01b2
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,19 @@ def setup_dagster_home() -> Generator[str, None, None]:
yield tmpdir


@pytest.fixture(name="dagster_defs_type")
def dagster_defs_file_type() -> str:
"""Return the type of file that contains the dagster definitions."""
return "-f"
@pytest.fixture(name="dagster_dev_cmd")
def dagster_dev_cmd(dagster_defs_path: str) -> List[str]:
"""Return the command used to stand up dagster dev."""
return ["dagster", "dev", "-f", dagster_defs_path, "-p", "3333"]


@pytest.fixture(name="dagster_dev")
def setup_dagster(
dagster_home: str, dagster_defs_path: str, dagster_defs_type: str
) -> Generator[Any, None, None]:
def setup_dagster(dagster_home: str, dagster_dev_cmd: List[str]) -> Generator[Any, None, None]:
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided
by a fixture included in the callsite.
"""
process = subprocess.Popen(
["dagster", "dev", dagster_defs_type, dagster_defs_path, "-p", "3333"],
dagster_dev_cmd,
env=os.environ.copy(),
shell=False,
preexec_fn=os.setsid, # noqa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,20 @@ def execute(self, context) -> None:

DBT_DIR = os.getenv("DBT_PROJECT_DIR")
# Create the DAG with the specified schedule interval
dbt_dag = DAG("dbt_dag", default_args=default_args, schedule_interval=None)
dbt_dag = DAG(
"dbt_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False
)
args = f"--project-dir {DBT_DIR} --profiles-dir {DBT_DIR}"
run_dbt_model = BashOperator(
task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dbt_dag
)

dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None)
dag = DAG(
"load_lakehouse",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
)
load_iris = LoadToLakehouseOperator(
task_id="load_iris",
dag=dag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,40 @@

import pytest
from dagster._core.test_utils import environ
from dagster_airlift.test.shared_fixtures import stand_up_airflow


def makefile_dir() -> Path:
return Path(__file__).parent.parent.parent


@pytest.fixture(name="local_env")
def local_env_fixture() -> Generator[None, None, None]:
makefile_dir = Path(__file__).parent.parent.parent
subprocess.run(["make", "setup_local_env"], cwd=makefile_dir, check=True)
subprocess.run(["make", "setup_local_env"], cwd=makefile_dir(), check=True)
with environ(
{
"AIRFLOW_HOME": str(makefile_dir / ".airflow_home"),
"DBT_PROJECT_DIR": str(makefile_dir / "dbt_example" / "shared" / "dbt"),
"DAGSTER_HOME": str(makefile_dir / ".dagster_home"),
"AIRFLOW_HOME": str(makefile_dir() / ".airflow_home"),
"DBT_PROJECT_DIR": str(makefile_dir() / "dbt_example" / "shared" / "dbt"),
"DAGSTER_HOME": str(makefile_dir() / ".dagster_home"),
}
):
yield
subprocess.run(["make", "wipe"], cwd=makefile_dir, check=True)
subprocess.run(["make", "wipe"], cwd=makefile_dir(), check=True)


@pytest.fixture(name="dags_dir")
def dags_dir_fixture() -> Path:
return Path(__file__).parent.parent / "dbt_example" / "airflow_dags"
return makefile_dir() / "dbt_example" / "airflow_dags"


@pytest.fixture(name="airflow_home")
def airflow_home_fixture(local_env) -> Path:
def airflow_home_fixture(local_env: None) -> Path:
return Path(os.environ["AIRFLOW_HOME"])


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]:
with stand_up_airflow(
airflow_cmd=["make", "run_airflow"], env=os.environ, cwd=makefile_dir()
) as process:
yield process
Original file line number Diff line number Diff line change
@@ -1,10 +1,90 @@
def test_defs_loads(airflow_instance) -> None:
from dbt_example.dagster_defs.peer import defs
import os
from datetime import timedelta
from typing import Callable, List, Tuple

assert defs
from dbt_example.dagster_defs.observe import defs
import pytest
from dagster import AssetKey, DagsterInstance
from dagster._time import get_current_datetime
from dagster_airlift.core import AirflowInstance

assert defs
from dbt_example.dagster_defs.migrate import defs
from dbt_example_tests.integration_tests.conftest import makefile_dir

assert defs

@pytest.fixture(name="dagster_home")
def dagster_home_fixture(local_env: None) -> str:
return os.environ["DAGSTER_HOME"]


@pytest.fixture(name="stage_and_fn")
def stage_and_fn_fixture(request) -> Tuple[str, Callable[[], AirflowInstance]]:
return request.param


@pytest.fixture(name="dagster_dev_cmd")
def dagster_dev_cmd_fixture(stage_and_fn: Tuple[str, Callable[[], AirflowInstance]]) -> List[str]:
dagster_dev_module = stage_and_fn[0]
if dagster_dev_module.endswith("peer"):
cmd = ["make", "run_peer"]
elif dagster_dev_module.endswith("observe"):
cmd = ["make", "run_observe"]
else:
cmd = ["make", "run_migrate"]
return cmd + ["-C", str(makefile_dir())]


def peer_instance() -> AirflowInstance:
from dbt_example.dagster_defs.peer import airflow_instance as af_instance_peer

return af_instance_peer


def observe_instance() -> AirflowInstance:
from dbt_example.dagster_defs.observe import airflow_instance as af_instance_observe

return af_instance_observe


def migrate_instance() -> AirflowInstance:
from dbt_example.dagster_defs.migrate import airflow_instance as af_instance_migrate

return af_instance_migrate


@pytest.mark.parametrize(
"stage_and_fn",
[
("peer", peer_instance),
("observe", observe_instance),
("migrate", migrate_instance),
],
ids=["peer", "observe", "migrate"],
indirect=True,
)
def test_dagster_materializes(
airflow_instance: None,
dagster_dev: None,
dagster_home: str,
stage_and_fn: Tuple[str, Callable[[], AirflowInstance]],
) -> None:
"""Test that assets can load properly, and that materializations register."""
dagster_dev_module, af_instance_fn = stage_and_fn
af_instance = af_instance_fn()
for dag_id, expected_asset_key in [("load_lakehouse", AssetKey(["lakehouse", "iris"]))]:
run_id = af_instance.trigger_dag("load_lakehouse")
af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id, timeout=60)
dagster_instance = DagsterInstance.get()
start_time = get_current_datetime()
while get_current_datetime() - start_time < timedelta(seconds=30):
asset_materialization = dagster_instance.get_latest_materialization_event(
asset_key=AssetKey(["airflow_instance", "dag", dag_id])
)
if asset_materialization:
break

assert asset_materialization

if dagster_dev_module.endswith("observe") or dagster_dev_module.endswith("migrate"):
asset_materialization = dagster_instance.get_latest_materialization_event(
asset_key=expected_asset_key
)
assert asset_materialization
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ allowlist_externals =
/bin/bash
uv
commands =
# We need to rebuild the UI to ensure that the dagster-webserver can run
make -C ../../../../.. rebuild_ui
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest -c ../../../../../pyproject.toml ./dbt_example_tests --snapshot-warn-unused -vv {posargs}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from datetime import timedelta
from typing import List

import pytest
from dagster import AssetKey, DagsterInstance
Expand All @@ -12,18 +13,18 @@ def dagster_home_fixture(local_env: None) -> str:
return os.environ["DAGSTER_HOME"]


@pytest.fixture(name="dagster_defs_path")
def dagster_defs_path_fixture(request) -> str:
@pytest.fixture(name="dagster_dev_module")
def dagster_dev_module_fixture(request) -> str:
return request.param


@pytest.fixture(name="dagster_defs_type")
def dagster_defs_type_fixture() -> str:
return "-m"
@pytest.fixture(name="dagster_dev_cmd")
def dagster_dev_cmd_fixture(dagster_dev_module: str) -> List[str]:
return ["dagster", "dev", "-m", dagster_dev_module, "-p", "3333"]


@pytest.mark.parametrize(
"dagster_defs_path",
"dagster_dev_module",
[
"perf_harness.dagster_defs.peer",
"perf_harness.dagster_defs.observe",
Expand All @@ -33,7 +34,7 @@ def dagster_defs_type_fixture() -> str:
indirect=True,
)
def test_dagster_materializes(
airflow_instance: None, dagster_dev: None, dagster_home: str, dagster_defs_path: str
airflow_instance: None, dagster_dev: None, dagster_home: str, dagster_dev_module: str
) -> None:
"""Test that assets can load properly, and that materializations register."""
run_id = af_instance.trigger_dag("dag_0")
Expand All @@ -49,7 +50,7 @@ def test_dagster_materializes(

assert asset_materialization

if dagster_defs_path.endswith("observe") or dagster_defs_path.endswith("migrate"):
if dagster_dev_module.endswith("observe") or dagster_dev_module.endswith("migrate"):
asset_materialization = dagster_instance.get_latest_materialization_event(
asset_key=AssetKey(["asset_0_0"])
)
Expand Down

0 comments on commit 5de01b2

Please sign in to comment.