Skip to content

Commit

Permalink
[dagster-airlift] move transitive asset dep testing to unit (#24161)
Browse files Browse the repository at this point in the history
This testing can be achieved more efficiently with unit tests.
  • Loading branch information
dpeng817 authored Sep 3, 2024
1 parent f79a36b commit 8721e8c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 110 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs
from dagster_airlift.test import make_instance

from dagster_airlift_tests.unit_tests.conftest import build_defs_from_airflow_asset_graph


@executor
def nonstandard_executor(init_context):
Expand Down Expand Up @@ -181,3 +183,50 @@ def test_unique_node_names_from_specs() -> None:
defg_def.node_def.name
== f"airflow_task_mapped_{unique_id_from_asset_and_check_keys([defg])}"
)


def test_transitive_asset_deps() -> None:
"""Test that cross-dag transitive asset dependencies rae correctly generated."""
# Asset graph is a -> b -> c where a and c are in different dags, and b isn't in any dag.
repo_def = build_defs_from_airflow_asset_graph(
assets_per_task={
"dag1": {"task": [("a", [])]},
"dag2": {"task": [("c", ["b"])]},
},
additional_defs=Definitions(assets=[AssetSpec(key="b", deps=["a"])]),
)
repo_def.load_all_definitions()
dag1_key = AssetKey(["airflow_instance", "dag", "dag1"])
dag2_key = AssetKey(["airflow_instance", "dag", "dag2"])
a_key = AssetKey(["a"])
b_key = AssetKey(["b"])
c_key = AssetKey(["c"])
assert len(repo_def.assets_defs_by_key) == 5
assert set(repo_def.assets_defs_by_key.keys()) == {
dag1_key,
dag2_key,
a_key,
b_key,
c_key,
}
dag1_asset = repo_def.assets_defs_by_key[dag1_key]
assert [dep.asset_key for dep in next(iter(dag1_asset.specs)).deps] == [a_key]
dag2_asset = repo_def.assets_defs_by_key[dag2_key]
assert [dep.asset_key for dep in next(iter(dag2_asset.specs)).deps] == [c_key]
a_asset = repo_def.assets_defs_by_key[a_key]
assert [dep.asset_key for dep in next(iter(a_asset.specs)).deps] == []
assert "airlift/dag_id" in next(iter(a_asset.specs)).tags
assert next(iter(a_asset.specs)).tags["airlift/dag_id"] == "dag1"
assert "airlift/task_id" in next(iter(a_asset.specs)).tags
assert next(iter(a_asset.specs)).tags["airlift/task_id"] == "task"

b_asset = repo_def.assets_defs_by_key[b_key]
assert [dep.asset_key for dep in next(iter(b_asset.specs)).deps] == [a_key]
assert "airlift/dag_id" not in next(iter(b_asset.specs)).tags
assert "airlift/task_id" not in next(iter(b_asset.specs)).tags
c_asset = repo_def.assets_defs_by_key[c_key]
assert [dep.asset_key for dep in next(iter(c_asset.specs)).deps] == [b_key]
assert "airlift/dag_id" in next(iter(c_asset.specs)).tags
assert next(iter(c_asset.specs)).tags["airlift/dag_id"] == "dag2"
assert "airlift/task_id" in next(iter(c_asset.specs)).tags
assert next(iter(c_asset.specs)).tags["airlift/task_id"] == "task"

0 comments on commit 8721e8c

Please sign in to comment.