From 8721e8cf6c4b362b894122d926ed0c4a7c4c02fb Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Tue, 3 Sep 2024 14:26:25 -0700 Subject: [PATCH] [dagster-airlift] move transitive asset dep testing to unit (#24161) This testing can be achieved more efficiently with unit tests. --- .../test_transitive_asset_deps.py | 71 ------------------- .../transitive_asset_deps_dags/first.py | 19 ----- .../transitive_asset_deps_dags/second.py | 20 ------ .../unit_tests/test_build_defs.py | 49 +++++++++++++ 4 files changed, 49 insertions(+), 110 deletions(-) delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py deleted file mode 100644 index 9f422f56c193d..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py +++ /dev/null @@ -1,71 +0,0 @@ -from pathlib import Path -from typing import List - -import pytest -from dagster import Definitions, multi_asset -from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.asset_spec import AssetSpec -from dagster_airlift.core.airflow_cacheable_assets_def import AirflowCacheableAssetsDefinition -from dagster_airlift.core.airflow_instance import AirflowInstance -from dagster_airlift.core.basic_auth import BasicAuthBackend - - -# Dag dir override -@pytest.fixture(name="dags_dir") -def dag_dir_fixture() -> Path: - return Path(__file__).parent / "transitive_asset_deps_dags" - - -def create_asset_for_task(task_id: str, dag_id: str, key: str, dep_keys: List[str] = []): - @multi_asset( - specs=[ - AssetSpec( - key=AssetKey([key]), - deps=[AssetSpec(key=AssetKey([dep_key])) for dep_key in dep_keys], - ) - ], - op_tags={"airlift/task_id": task_id, "airlift/dag_id": dag_id}, - ) - def the_asset(): - return 1 - - return the_asset - - -def test_transitive_deps(airflow_instance: None) -> None: - """Test that even with cross-dag transitive asset deps, the correct dependencies are generated for the asset graph.""" - instance = AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url="http://localhost:8080", username="admin", password="admin" - ), - name="airflow_instance", - ) - cacheable_assets = AirflowCacheableAssetsDefinition( - airflow_instance=instance, - defs=Definitions( - assets=[ - create_asset_for_task("one", "first", "first_one"), - create_asset_for_task("two", "first", "first_two", dep_keys=["second_one"]), - create_asset_for_task("three", "first", "first_three", dep_keys=["second_two"]), - create_asset_for_task("one", "second", "second_one", dep_keys=["first_one"]), - create_asset_for_task("two", "second", "second_two"), - ] - ), - migration_state_override=None, - poll_interval=1, - ) - - defs = Definitions(assets=[cacheable_assets]) - repository_def = defs.get_repository_def() - assets_defs = repository_def.assets_defs_by_key - assert len(assets_defs) == 7 # 5 tasks + 2 dags - assert AssetKey(["airflow_instance", "dag", "first"]) in assets_defs - dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "first"])] - spec = next(iter(dag_def.specs)) - deps_keys = {dep.asset_key for dep in spec.deps} - assert deps_keys == {AssetKey(["first_two"]), AssetKey(["first_three"])} - assert AssetKey(["airflow_instance", "dag", "second"]) in assets_defs - dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "second"])] - spec = next(iter(dag_def.specs)) - deps_keys = {dep.asset_key for dep in spec.deps} - assert deps_keys == {AssetKey(["second_one"]), AssetKey(["second_two"])} diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py deleted file mode 100644 index f2899c86d4e59..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py +++ /dev/null @@ -1,19 +0,0 @@ -from datetime import datetime - -from airflow import DAG -from airflow.operators.python import PythonOperator - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -dag = DAG("first", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False) -# This task will have a downstream to second.upstream_on_first -one = PythonOperator(task_id="one", python_callable=lambda: None, dag=dag) -# This task will have an upstream on second.only. Meaning first.one should not be considered a "leaf". -two = PythonOperator(task_id="two", python_callable=lambda: None, dag=dag) -# This task will have an upstream on second.only. Meaning first.one should not be considered a "leaf". -three = PythonOperator(task_id="three", python_callable=lambda: None, dag=dag) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py deleted file mode 100644 index 8d9c7c424a270..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py +++ /dev/null @@ -1,20 +0,0 @@ -from datetime import datetime - -from airflow import DAG -from airflow.operators.python import PythonOperator - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -dag = DAG( - "second", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -# Neither of these tasks have downstreams within the dag, so they should be considered "leaf" tasks. -# This task will have a downstream to first.two -one = PythonOperator(task_id="one", python_callable=lambda: None, dag=dag) -# This task will have a downstream to first.three -two = PythonOperator(task_id="two", python_callable=lambda: None, dag=dag) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py index afa99bc8777dd..8a0dc3cc56d34 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py @@ -15,6 +15,8 @@ from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs from dagster_airlift.test import make_instance +from dagster_airlift_tests.unit_tests.conftest import build_defs_from_airflow_asset_graph + @executor def nonstandard_executor(init_context): @@ -181,3 +183,50 @@ def test_unique_node_names_from_specs() -> None: defg_def.node_def.name == f"airflow_task_mapped_{unique_id_from_asset_and_check_keys([defg])}" ) + + +def test_transitive_asset_deps() -> None: + """Test that cross-dag transitive asset dependencies rae correctly generated.""" + # Asset graph is a -> b -> c where a and c are in different dags, and b isn't in any dag. + repo_def = build_defs_from_airflow_asset_graph( + assets_per_task={ + "dag1": {"task": [("a", [])]}, + "dag2": {"task": [("c", ["b"])]}, + }, + additional_defs=Definitions(assets=[AssetSpec(key="b", deps=["a"])]), + ) + repo_def.load_all_definitions() + dag1_key = AssetKey(["airflow_instance", "dag", "dag1"]) + dag2_key = AssetKey(["airflow_instance", "dag", "dag2"]) + a_key = AssetKey(["a"]) + b_key = AssetKey(["b"]) + c_key = AssetKey(["c"]) + assert len(repo_def.assets_defs_by_key) == 5 + assert set(repo_def.assets_defs_by_key.keys()) == { + dag1_key, + dag2_key, + a_key, + b_key, + c_key, + } + dag1_asset = repo_def.assets_defs_by_key[dag1_key] + assert [dep.asset_key for dep in next(iter(dag1_asset.specs)).deps] == [a_key] + dag2_asset = repo_def.assets_defs_by_key[dag2_key] + assert [dep.asset_key for dep in next(iter(dag2_asset.specs)).deps] == [c_key] + a_asset = repo_def.assets_defs_by_key[a_key] + assert [dep.asset_key for dep in next(iter(a_asset.specs)).deps] == [] + assert "airlift/dag_id" in next(iter(a_asset.specs)).tags + assert next(iter(a_asset.specs)).tags["airlift/dag_id"] == "dag1" + assert "airlift/task_id" in next(iter(a_asset.specs)).tags + assert next(iter(a_asset.specs)).tags["airlift/task_id"] == "task" + + b_asset = repo_def.assets_defs_by_key[b_key] + assert [dep.asset_key for dep in next(iter(b_asset.specs)).deps] == [a_key] + assert "airlift/dag_id" not in next(iter(b_asset.specs)).tags + assert "airlift/task_id" not in next(iter(b_asset.specs)).tags + c_asset = repo_def.assets_defs_by_key[c_key] + assert [dep.asset_key for dep in next(iter(c_asset.specs)).deps] == [b_key] + assert "airlift/dag_id" in next(iter(c_asset.specs)).tags + assert next(iter(c_asset.specs)).tags["airlift/dag_id"] == "dag2" + assert "airlift/task_id" in next(iter(c_asset.specs)).tags + assert next(iter(c_asset.specs)).tags["airlift/task_id"] == "task"