From 259d879339552cacb354ae012fd1bc6713bbea16 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 29 Jul 2024 16:29:06 -0700 Subject: [PATCH] Fix materialization reporting order for tasks --- .../dagster_airlift/airflow_utils.py | 18 ++++++++-- .../airflow_project/dags/simple_dag.py | 5 ++- .../test_airflow_utils.py | 33 ++++++++++++++++--- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py index 7bf73ac34d9bc..d23881a00f72e 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py @@ -19,7 +19,8 @@ multi_asset, sensor, ) -from dagster._time import datetime_from_timestamp, get_current_datetime +from dagster._core.utils import toposort_flatten +from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp from dagster_dbt import build_dbt_asset_specs from pydantic import BaseModel @@ -217,9 +218,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: "Link to Run": MarkdownMetadataValue( f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)" ), + "Creation Timestamp": TimestampMetadataValue( + get_current_timestamp() + ), }, ) - for spec in specs + for spec in toposort_specs(specs) ] ) context.update_cursor(str(current_date.timestamp())) @@ -304,3 +308,13 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]: if key not in downstreams_per_key ] return leaf_specs + + +def toposort_specs(specs: List[AssetSpec]) -> List[AssetSpec]: + spec_per_key = {spec.key: spec for spec in specs} + return [ + spec_per_key[key] + for key in toposort_flatten( + {spec.key: {dep.asset_key for dep in spec.deps} for spec in specs} + ) + ] diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py index d0d90ab1c4bdf..0f880463334ad 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py @@ -18,4 +18,7 @@ def print_hello(): dag = DAG( "print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False ) -load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +downstream_print_op = PythonOperator( + task_id="downstream_print_task", python_callable=print_hello, dag=dag +) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py index ce47c1853dfc7..c20ea10e81cd9 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py @@ -2,7 +2,13 @@ from typing import cast import requests -from dagster import JsonMetadataValue, MarkdownMetadataValue, SensorResult, build_sensor_context +from dagster import ( + AssetDep, + JsonMetadataValue, + MarkdownMetadataValue, + SensorResult, + build_sensor_context, +) from dagster._core.definitions.asset_key import AssetKey from dagster._core.test_utils import instance_for_test from dagster_airlift import ( @@ -25,10 +31,16 @@ def test_dag_peering( dag_id="print_dag", task_id="print_task", key=AssetKey(["some", "key"]), - ) + ), + TaskMapping( + dag_id="print_dag", + task_id="downstream_print_task", + key=AssetKey(["other", "key"]), + deps=[AssetDep(AssetKey(["some", "key"]))], + ), ], ) - assert len(assets_defs) == 2 + assert len(assets_defs) == 3 dag_def = [ # noqa assets_def for assets_def in assets_defs @@ -82,7 +94,7 @@ def test_dag_peering( sensor_context = build_sensor_context(instance=instance) sensor_result = sensor_def(sensor_context) assert isinstance(sensor_result, SensorResult) - assert len(sensor_result.asset_events) == 2 + assert len(sensor_result.asset_events) == 3 dag_mat = [ # noqa asset_mat for asset_mat in sensor_result.asset_events @@ -111,3 +123,16 @@ def test_dag_peering( assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue( f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)" ) + + other_mat = [ # noqa + asset_mat + for asset_mat in sensor_result.asset_events + if asset_mat.asset_key == AssetKey(["other", "key"]) + ][0] + + assert other_mat + # other mat should be downstream of task mat + assert ( # type: ignore + other_mat.metadata["Creation Timestamp"].value + >= task_mat.metadata["Creation Timestamp"].value + )