Skip to content

Commit

Permalink
[dagster-airlift] remove unnecessary integration test (#24163)
Browse files Browse the repository at this point in the history
This integration test is covered by unit tests already, and is not
needed.
  • Loading branch information
dpeng817 authored Sep 3, 2024
1 parent 881046b commit 886a43a
Showing 1 changed file with 2 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import pytest
from dagster import AssetCheckKey, AssetKey, Definitions, asset, asset_check
from dagster._core.errors import DagsterError
from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance
from dagster_airlift.core import AirflowInstance, BasicAuthBackend

from .conftest import assert_link_exists


def test_airflow_instance(airflow_instance: None) -> None:
"""Test AirflowInstance APIs against live-running airflow.
"""Test AirflowInstance APIs against live-running airflow. Ensure that links result in 200s.
Airflow is loaded with one dag (print_dag) which contains two tasks (print_task, downstream_print_task).
"""
Expand Down Expand Up @@ -65,46 +64,3 @@ def test_airflow_instance(airflow_instance: None) -> None:
assert isinstance(task_instance.start_date, float)
assert isinstance(task_instance.end_date, float)
assert isinstance(task_instance.note, str)


def test_orchestrated_defs(airflow_instance: None) -> None:
"""Test allowing assets and asset checks to be passed through orchestrated defs without airflow mapping,
and that they will be present in the ultimately created defs.
"""

@asset
def print_dag__print_task():
pass

@asset
def not_orchestrated_in_airflow():
pass

@asset_check(asset=print_dag__print_task.key)
def check_print_task():
pass

instance = AirflowInstance(
auth_backend=BasicAuthBackend(
webserver_url="http://localhost:8080", username="admin", password="admin"
),
name="airflow_instance",
)

defs = build_defs_from_airflow_instance(
airflow_instance=instance,
defs=Definitions(
assets=[print_dag__print_task, not_orchestrated_in_airflow],
asset_checks=[check_print_task],
),
).get_repository_def()
assert len(defs.assets_defs_by_key) == 3
assert set(defs.assets_defs_by_key.keys()) == {
AssetKey(["airflow_instance", "dag", "print_dag"]),
AssetKey(["print_dag__print_task"]),
AssetKey(["not_orchestrated_in_airflow"]),
}
assert len(defs.asset_checks_defs_by_key) == 1
assert set(defs.asset_checks_defs_by_key.keys()) == {
AssetCheckKey(name="check_print_task", asset_key=AssetKey(["print_dag__print_task"]))
}

0 comments on commit 886a43a

Please sign in to comment.