Skip to content

Commit

Permalink
[dagster-airlift] move dbt tests to unit (#24162)
Browse files Browse the repository at this point in the history
Get rid of dbt integration test and move relevant bits to unit tests.
  • Loading branch information
dpeng817 authored Sep 3, 2024
1 parent 433e129 commit 881046b
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 105 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
import os
import subprocess
from pathlib import Path
from typing import Generator

from dagster._core.definitions.definitions_class import Definitions
import pytest
from dagster import AssetSpec, Definitions
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.test_utils import environ
from dagster_airlift.core.dag_defs import dag_defs, task_defs
from dagster_airlift.core.defs_from_airflow import build_defs_from_airflow_instance
from dagster_airlift.dbt import dbt_defs
from dagster_airlift.migration_state import (
AirflowMigrationState,
DagMigrationState,
TaskMigrationState,
)
from dagster_airlift.test import make_instance
from dagster_dbt.dbt_project import DbtProject


def get_dbt_project_path() -> Path:
return Path(__file__).parent.parent / "integration_tests" / "dbt_project"
@pytest.fixture(name="dbt_project_path")
def dbt_project_path_fixture() -> Generator[Path, None, None]:
path = Path(__file__).parent / "dbt_project"
with environ(
{"DBT_PROJECT_DIR": str(path), "DUCKDB_PATH": str(path / "target" / "local.duckdb")}
):
yield path


def dummy_defs() -> Definitions:
return Definitions()
@pytest.fixture(name="dbt_project_setup")
def dbt_project(dbt_project_path: Path) -> None:
"""Builds dbt project."""
subprocess.run(
["dbt", "build", "--project-dir", dbt_project_path, "--profiles-dir", dbt_project_path],
check=True,
env=os.environ.copy(),
)


def test_dbt_defs() -> None:
dbt_project_path = get_dbt_project_path()
def test_dbt_defs(dbt_project_path: Path, dbt_project_setup: None) -> None:
"""Test that a dbt project being orchestrated elsewhere can be loaded, and that downstreams from dbt models are correctly hooked up."""
# Dag_one has a set of dbt models. Dag_two has an asset "downstream" which is downstream of "customers".

dbt_defs_inst = dbt_defs(
manifest=dbt_project_path / "target" / "manifest.json",
Expand All @@ -42,7 +55,9 @@ def test_dbt_defs() -> None:
),
dag_defs(
"dag_two",
task_defs("task_two", dummy_defs()),
task_defs(
"task_two", Definitions(assets=[AssetSpec("downstream", deps=["customers"])])
),
),
)

Expand All @@ -51,17 +66,34 @@ def test_dbt_defs() -> None:
defs = build_defs_from_airflow_instance(
airflow_instance=test_airflow_instance,
defs=initial_defs,
migration_state_override=AirflowMigrationState(
{
"dag_one": DagMigrationState(
{"task_one": TaskMigrationState("task_one", migrated=True)}
)
}
),
)

assert isinstance(defs, Definitions)

Definitions.validate_loadable(defs)

# dbt resource should be present in the final definitions
assert set(defs.get_repository_def().get_top_level_resources().keys()) == {"dbt"}
repo_def = defs.get_repository_def()
repo_def.load_all_definitions()
expected_deps = {
"raw_customers": [],
"raw_orders": [],
"raw_payments": [],
"stg_customers": ["raw_customers"],
"stg_orders": ["raw_orders"],
"stg_payments": ["raw_payments"],
"orders": ["stg_orders", "stg_payments"],
"customers": ["stg_customers", "stg_orders", "stg_payments"],
"downstream": ["customers"],
"airflow_instance/dag/dag_one": ["customers", "orders"],
"airflow_instance/dag/dag_two": ["downstream"],
}
for key, deps_list in expected_deps.items():
qual_key = AssetKey.from_user_string(key)
assert qual_key in repo_def.assets_defs_by_key
assets_def = repo_def.assets_defs_by_key[qual_key]
spec = next(spec for spec in assets_def.specs if spec.key == qual_key)
assert {dep.asset_key for dep in spec.deps} == {
AssetKey.from_user_string(dep) for dep in deps_list
}

0 comments on commit 881046b

Please sign in to comment.