diff --git a/examples/experimental/dagster-airlift/create_airflow_cfg.sh b/examples/experimental/dagster-airlift/airflow_setup.sh similarity index 70% rename from examples/experimental/dagster-airlift/create_airflow_cfg.sh rename to examples/experimental/dagster-airlift/airflow_setup.sh index 29097fab0e76b..0312e5ddd5d82 100755 --- a/examples/experimental/dagster-airlift/create_airflow_cfg.sh +++ b/examples/experimental/dagster-airlift/airflow_setup.sh @@ -24,4 +24,14 @@ auth_backend = airflow.api.auth.backend.basic_auth [webserver] expose_config = True -EOL \ No newline at end of file +EOL + +# call airflow command to create the default user +airflow db migrate && \ +airflow users create \ + --username admin \ +--password admin \ + --firstname Peter \ + --lastname Parker \ + --role Admin \ + --email spiderman@superhero.org \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/conftest.py b/examples/experimental/dagster-airlift/conftest.py new file mode 100644 index 0000000000000..c6cfe053045be --- /dev/null +++ b/examples/experimental/dagster-airlift/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_airlift.shared_fixtures"] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py index 7bf73ac34d9bc..d23881a00f72e 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py @@ -19,7 +19,8 @@ multi_asset, sensor, ) -from dagster._time import datetime_from_timestamp, get_current_datetime +from dagster._core.utils import toposort_flatten +from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp from dagster_dbt import build_dbt_asset_specs from pydantic import BaseModel @@ -217,9 +218,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: "Link to Run": MarkdownMetadataValue( f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)" ), + "Creation Timestamp": TimestampMetadataValue( + get_current_timestamp() + ), }, ) - for spec in specs + for spec in toposort_specs(specs) ] ) context.update_cursor(str(current_date.timestamp())) @@ -304,3 +308,13 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]: if key not in downstreams_per_key ] return leaf_specs + + +def toposort_specs(specs: List[AssetSpec]) -> List[AssetSpec]: + spec_per_key = {spec.key: spec for spec in specs} + return [ + spec_per_key[key] + for key in toposort_flatten( + {spec.key: {dep.asset_key for dep in spec.deps} for spec in specs} + ) + ] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py new file mode 100644 index 0000000000000..476eb0ca22b3c --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/shared_fixtures.py @@ -0,0 +1,43 @@ +import os +import signal +import subprocess +import time +from typing import Any, Generator + +import pytest +import requests +from dagster._time import get_current_timestamp + + +def airflow_is_ready(): + try: + response = requests.get("http://localhost:8080") + return response.status_code == 200 + except: + return False + + +# Setup should have set AIRFLOW_HOME env var +@pytest.fixture(name="airflow_instance") +def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]: + process = subprocess.Popen( + ["airflow", "standalone"], + env=os.environ, # since we have some temp vars in the env right now + shell=False, + preexec_fn=os.setsid, # noqa # fuck it we ball + ) + # Give airflow a second to stand up + time.sleep(5) + initial_time = get_current_timestamp() + + airflow_ready = False + while get_current_timestamp() - initial_time < 30: + if airflow_is_ready(): + airflow_ready = True + break + time.sleep(1) + + assert airflow_ready, "Airflow did not start within 30 seconds..." + yield process + # Kill process group, since process.kill and process.terminate do not work. + os.killpg(process.pid, signal.SIGKILL) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py index d0d90ab1c4bdf..0f880463334ad 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py @@ -18,4 +18,7 @@ def print_hello(): dag = DAG( "print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False ) -load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +downstream_print_op = PythonOperator( + task_id="downstream_print_task", python_callable=print_hello, dag=dag +) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py index c4f471201ea68..6ab2d0c7fc9e5 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/conftest.py @@ -1,65 +1,24 @@ import os -import signal import subprocess from tempfile import TemporaryDirectory -from typing import Any, Generator +from typing import Generator import pytest +from dagster._core.test_utils import environ -@pytest.fixture(name="airflow_home_dir") -def airflow_home_dir_fixture() -> Generator[str, None, None]: +@pytest.fixture(name="setup") +def setup_fixture() -> Generator[str, None, None]: with TemporaryDirectory() as tmpdir: # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} # go up one directory from current - path_to_script = os.path.join(os.path.dirname(__file__), "..", "create_airflow_cfg.sh") + path_to_script = os.path.join(os.path.dirname(__file__), "..", "airflow_setup.sh") path_to_dags = os.path.join(os.path.dirname(__file__), "airflow_project", "dags") subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env) - yield tmpdir - - -@pytest.fixture(name="airflow_instance") -def airflow_instance_fixture(airflow_home_dir: str) -> Generator[Any, None, None]: - temp_env = {**os.environ.copy(), "AIRFLOW_HOME": airflow_home_dir} - subprocess.run(["airflow", "db", "migrate"], check=True, env=temp_env) - subprocess.run( - [ - "airflow", - "users", - "create", - "--username", - "admin", - "--firstname", - "admin", - "--lastname", - "admin", - "--role", - "Admin", - "--email", - "foo@bar.com", - "--password", - "admin", - ], - check=True, - env=temp_env, - ) - process = subprocess.Popen( - ["airflow", "standalone"], - env=temp_env, - shell=False, - stdout=subprocess.PIPE, - preexec_fn=os.setsid, # noqa # fuck it we ball - ) - assert process.stdout is not None - for line in process.stdout: - if "Airflow is ready" in line.decode(): - break - yield process - # Kill process group, since process.kill and process.terminate do not work. - os.killpg(process.pid, signal.SIGKILL) - process.wait() + with environ({"AIRFLOW_HOME": tmpdir}): + yield tmpdir @pytest.fixture(name="dbt_project_dir") diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py index 22c50c665c4f6..c20ea10e81cd9 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py @@ -2,7 +2,13 @@ from typing import cast import requests -from dagster import JsonMetadataValue, MarkdownMetadataValue, SensorResult, build_sensor_context +from dagster import ( + AssetDep, + JsonMetadataValue, + MarkdownMetadataValue, + SensorResult, + build_sensor_context, +) from dagster._core.definitions.asset_key import AssetKey from dagster._core.test_utils import instance_for_test from dagster_airlift import ( @@ -14,7 +20,6 @@ def test_dag_peering( airflow_instance: None, - airflow_home_dir: str, ) -> None: """Test that dags can be correctly peered from airflow, and certain metadata properties are retained.""" assets_defs = assets_defs_from_airflow_instance( @@ -26,10 +31,16 @@ def test_dag_peering( dag_id="print_dag", task_id="print_task", key=AssetKey(["some", "key"]), - ) + ), + TaskMapping( + dag_id="print_dag", + task_id="downstream_print_task", + key=AssetKey(["other", "key"]), + deps=[AssetDep(AssetKey(["some", "key"]))], + ), ], ) - assert len(assets_defs) == 2 + assert len(assets_defs) == 3 dag_def = [ # noqa assets_def for assets_def in assets_defs @@ -83,7 +94,7 @@ def test_dag_peering( sensor_context = build_sensor_context(instance=instance) sensor_result = sensor_def(sensor_context) assert isinstance(sensor_result, SensorResult) - assert len(sensor_result.asset_events) == 2 + assert len(sensor_result.asset_events) == 3 dag_mat = [ # noqa asset_mat for asset_mat in sensor_result.asset_events @@ -112,3 +123,16 @@ def test_dag_peering( assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue( f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)" ) + + other_mat = [ # noqa + asset_mat + for asset_mat in sensor_result.asset_events + if asset_mat.asset_key == AssetKey(["other", "key"]) + ][0] + + assert other_mat + # other mat should be downstream of task mat + assert ( # type: ignore + other_mat.metadata["Creation Timestamp"].value + >= task_mat.metadata["Creation Timestamp"].value + ) diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile b/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile index f9ea2edc823af..11739a11e03d0 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/Makefile @@ -27,16 +27,8 @@ setup_local_env: make wipe && \ mkdir -p $$AIRFLOW_HOME && \ mkdir -p $$DAGSTER_HOME && \ - chmod +x ../../create_airflow_cfg.sh && \ - ../../create_airflow_cfg.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \ - airflow db migrate && \ - airflow users create \ - --username admin \ - --password admin \ - --firstname Peter \ - --lastname Parker \ - --role Admin \ - --email spiderman@superhero.org && \ + chmod +x ../../airflow_setup.sh && \ + ../../airflow_setup.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \ make dbt_setup run_airflow: diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py b/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py new file mode 100644 index 0000000000000..c6cfe053045be --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_airlift.shared_fixtures"] diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py b/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py index 42d8bdea6f3a7..934c1176688c1 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/peering_with_dbt_tests/conftest.py @@ -1,7 +1,6 @@ import os -import signal import subprocess -from typing import Any, Generator +from typing import Generator import pytest from dagster._core.test_utils import environ @@ -19,21 +18,3 @@ def setup_fixture() -> Generator[None, None, None]: ): yield subprocess.run(["make", "wipe"], check=True) - - -@pytest.fixture(name="airflow_instance") -def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]: - process = subprocess.Popen( - ["airflow", "standalone"], - env=os.environ, # since we have some temp vars in the env right now - shell=False, - stdout=subprocess.PIPE, - preexec_fn=os.setsid, # noqa # fuck it we ball - ) - assert process.stdout is not None - for line in process.stdout: - if "Airflow is ready" in line.decode(): - break - yield process - # Kill process group, since process.kill and process.terminate do not work. - os.killpg(process.pid, signal.SIGKILL)