diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_defs_from_yaml/my_dbt_multi_asset.yaml b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_defs_from_yaml/my_dbt_multi_asset.yaml deleted file mode 100644 index 4884237294878..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_defs_from_yaml/my_dbt_multi_asset.yaml +++ /dev/null @@ -1,4 +0,0 @@ -type: dbt_project -dbt_project_path: - env_var: DBT_PROJECT_DIR -group: dbt \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dbt_multi_asset.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dbt_multi_asset.py deleted file mode 100644 index 1ebe8f30bd922..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dbt_multi_asset.py +++ /dev/null @@ -1,81 +0,0 @@ -import os -import subprocess -from pathlib import Path -from typing import Dict, Generator, List - -import pytest -from dagster import AssetKey, AssetsDefinition -from dagster._core.test_utils import environ -from dagster_airlift.dbt import dbt_defs -from dagster_dbt import DbtProject - - -@pytest.fixture(name="dbt_project_dir") -def dbt_project_fixture() -> Generator[Path, None, None]: - path = Path(__file__).parent / "dbt_project" - with environ( - {"DBT_PROJECT_DIR": str(path), "DUCKDB_PATH": str(path / "target" / "local.duckdb")} - ): - yield path - - -@pytest.fixture -def dbt_project(dbt_project_dir: Path) -> None: - """Builds dbt project.""" - subprocess.run( - ["dbt", "build", "--project-dir", dbt_project_dir, "--profiles-dir", dbt_project_dir], - check=True, - env=os.environ.copy(), - ) - - -def test_load_dbt_project(dbt_project_dir: Path, dbt_project: None) -> None: - """Test that DBT project is correctly parsed as airflow tasks.""" - assert os.environ["DBT_PROJECT_DIR"] == str( - dbt_project_dir - ), "Expected dbt project dir to be set as env var" - defs = dbt_defs( - manifest=dbt_project_dir / "target" / "manifest.json", - project=DbtProject(project_dir=dbt_project_dir), - ) - assert defs.assets - all_assets = list(defs.assets) - assert len(all_assets) == 1 - assets_def = all_assets[0] - assert isinstance(assets_def, AssetsDefinition) - assert assets_def.node_def.name == "build_jaffle_shop" - assert assets_def.is_executable - specs_list = list(assets_def.specs) - # In jaffle shop, there are 8 dbt models. - # raw versionsof payments, orders, and customers, staging versions of payments, orders, and - # customers, and final versions of orders, and customers. We expect this to be reflected in the - # mappings. - assert len(specs_list) == 8 - expected_deps: Dict[str, List[str]] = { - "raw_customers": [], - "raw_orders": [], - "raw_payments": [], - "stg_customers": ["raw_customers"], - "stg_orders": ["raw_orders"], - "stg_payments": ["raw_payments"], - "orders": ["stg_orders", "stg_payments"], - "customers": ["stg_customers", "stg_orders", "stg_payments"], - } - for key, deps_list in expected_deps.items(): - spec = next( - (spec for spec in specs_list if spec.key == AssetKey.from_user_string(key)), None - ) - assert spec, f"Could not find a spec for key {key}" - for expected_dep_key in deps_list: - found_dep = next( - ( - dep - for dep in spec.deps - if dep.asset_key == AssetKey.from_user_string(expected_dep_key) - ), - None, - ) - assert found_dep, f"Could not find a dep on key {expected_dep_key} for key {key}" - - # Actually execute dbt models via build - assert defs.get_implicit_global_asset_job_def().execute_in_process().success diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/dbt_project.yml b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/dbt_project.yml similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/dbt_project.yml rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/dbt_project.yml diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/customers.sql b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/customers.sql similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/customers.sql rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/customers.sql diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/docs.md b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/docs.md similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/docs.md rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/docs.md diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/orders.sql b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/orders.sql similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/orders.sql rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/orders.sql diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/overview.md b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/overview.md similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/overview.md rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/overview.md diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/schema.yml b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/schema.yml similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/schema.yml rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/schema.yml diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/schema.yml b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/schema.yml similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/schema.yml rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/schema.yml diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_customers.sql b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_customers.sql similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_customers.sql rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_customers.sql diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_orders.sql b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_orders.sql similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_orders.sql rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_orders.sql diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_payments.sql b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_payments.sql similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/models/staging/stg_payments.sql rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/models/staging/stg_payments.sql diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/profiles.yml b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/profiles.yml similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/profiles.yml rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/profiles.yml diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/.gitkeep b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/.gitkeep similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/.gitkeep rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/.gitkeep diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_customers.csv b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_customers.csv similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_customers.csv rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_customers.csv diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_orders.csv b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_orders.csv similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_orders.csv rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_orders.csv diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_payments.csv b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_payments.csv similarity index 100% rename from examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/dbt_project/seeds/raw_payments.csv rename to examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/dbt_project/seeds/raw_payments.csv diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py index 41c6fc98d84ac..ab91f684bd973 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py @@ -1,28 +1,41 @@ +import os +import subprocess from pathlib import Path +from typing import Generator -from dagster._core.definitions.definitions_class import Definitions +import pytest +from dagster import AssetSpec, Definitions +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.test_utils import environ from dagster_airlift.core.dag_defs import dag_defs, task_defs from dagster_airlift.core.defs_from_airflow import build_defs_from_airflow_instance from dagster_airlift.dbt import dbt_defs -from dagster_airlift.migration_state import ( - AirflowMigrationState, - DagMigrationState, - TaskMigrationState, -) from dagster_airlift.test import make_instance from dagster_dbt.dbt_project import DbtProject -def get_dbt_project_path() -> Path: - return Path(__file__).parent.parent / "integration_tests" / "dbt_project" +@pytest.fixture(name="dbt_project_path") +def dbt_project_path_fixture() -> Generator[Path, None, None]: + path = Path(__file__).parent / "dbt_project" + with environ( + {"DBT_PROJECT_DIR": str(path), "DUCKDB_PATH": str(path / "target" / "local.duckdb")} + ): + yield path -def dummy_defs() -> Definitions: - return Definitions() +@pytest.fixture(name="dbt_project_setup") +def dbt_project(dbt_project_path: Path) -> None: + """Builds dbt project.""" + subprocess.run( + ["dbt", "build", "--project-dir", dbt_project_path, "--profiles-dir", dbt_project_path], + check=True, + env=os.environ.copy(), + ) -def test_dbt_defs() -> None: - dbt_project_path = get_dbt_project_path() +def test_dbt_defs(dbt_project_path: Path, dbt_project_setup: None) -> None: + """Test that a dbt project being orchestrated elsewhere can be loaded, and that downstreams from dbt models are correctly hooked up.""" + # Dag_one has a set of dbt models. Dag_two has an asset "downstream" which is downstream of "customers". dbt_defs_inst = dbt_defs( manifest=dbt_project_path / "target" / "manifest.json", @@ -42,7 +55,9 @@ def test_dbt_defs() -> None: ), dag_defs( "dag_two", - task_defs("task_two", dummy_defs()), + task_defs( + "task_two", Definitions(assets=[AssetSpec("downstream", deps=["customers"])]) + ), ), ) @@ -51,17 +66,34 @@ def test_dbt_defs() -> None: defs = build_defs_from_airflow_instance( airflow_instance=test_airflow_instance, defs=initial_defs, - migration_state_override=AirflowMigrationState( - { - "dag_one": DagMigrationState( - {"task_one": TaskMigrationState("task_one", migrated=True)} - ) - } - ), ) assert isinstance(defs, Definitions) Definitions.validate_loadable(defs) + # dbt resource should be present in the final definitions assert set(defs.get_repository_def().get_top_level_resources().keys()) == {"dbt"} + repo_def = defs.get_repository_def() + repo_def.load_all_definitions() + expected_deps = { + "raw_customers": [], + "raw_orders": [], + "raw_payments": [], + "stg_customers": ["raw_customers"], + "stg_orders": ["raw_orders"], + "stg_payments": ["raw_payments"], + "orders": ["stg_orders", "stg_payments"], + "customers": ["stg_customers", "stg_orders", "stg_payments"], + "downstream": ["customers"], + "airflow_instance/dag/dag_one": ["customers", "orders"], + "airflow_instance/dag/dag_two": ["downstream"], + } + for key, deps_list in expected_deps.items(): + qual_key = AssetKey.from_user_string(key) + assert qual_key in repo_def.assets_defs_by_key + assets_def = repo_def.assets_defs_by_key[qual_key] + spec = next(spec for spec in assets_def.specs if spec.key == qual_key) + assert {dep.asset_key for dep in spec.deps} == { + AssetKey.from_user_string(dep) for dep in deps_list + }