diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py index 719659383277c..caa1cf5014429 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py @@ -1,13 +1,12 @@ import pytest -from dagster import AssetCheckKey, AssetKey, Definitions, asset, asset_check from dagster._core.errors import DagsterError -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance +from dagster_airlift.core import AirflowInstance, BasicAuthBackend from .conftest import assert_link_exists def test_airflow_instance(airflow_instance: None) -> None: - """Test AirflowInstance APIs against live-running airflow. + """Test AirflowInstance APIs against live-running airflow. Ensure that links result in 200s. Airflow is loaded with one dag (print_dag) which contains two tasks (print_task, downstream_print_task). """ @@ -65,46 +64,3 @@ def test_airflow_instance(airflow_instance: None) -> None: assert isinstance(task_instance.start_date, float) assert isinstance(task_instance.end_date, float) assert isinstance(task_instance.note, str) - - -def test_orchestrated_defs(airflow_instance: None) -> None: - """Test allowing assets and asset checks to be passed through orchestrated defs without airflow mapping, - and that they will be present in the ultimately created defs. - """ - - @asset - def print_dag__print_task(): - pass - - @asset - def not_orchestrated_in_airflow(): - pass - - @asset_check(asset=print_dag__print_task.key) - def check_print_task(): - pass - - instance = AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url="http://localhost:8080", username="admin", password="admin" - ), - name="airflow_instance", - ) - - defs = build_defs_from_airflow_instance( - airflow_instance=instance, - defs=Definitions( - assets=[print_dag__print_task, not_orchestrated_in_airflow], - asset_checks=[check_print_task], - ), - ).get_repository_def() - assert len(defs.assets_defs_by_key) == 3 - assert set(defs.assets_defs_by_key.keys()) == { - AssetKey(["airflow_instance", "dag", "print_dag"]), - AssetKey(["print_dag__print_task"]), - AssetKey(["not_orchestrated_in_airflow"]), - } - assert len(defs.asset_checks_defs_by_key) == 1 - assert set(defs.asset_checks_defs_by_key.keys()) == { - AssetCheckKey(name="check_print_task", asset_key=AssetKey(["print_dag__print_task"])) - }