From b7f87ae9c03a9fdf1f67e6eef7f2386f4c090131 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 29 Jul 2024 16:29:06 -0700 Subject: [PATCH] Fix materialization reporting order for tasks --- .../dagster_airlift/airflow_utils.py | 38 ++++++++++++++++++- .../airflow_project/dags/simple_dag.py | 5 ++- .../test_airflow_utils.py | 24 ++++++++++-- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py index 7bf73ac34d9bc..948e9f1bb28cd 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/airflow_utils.py @@ -19,7 +19,7 @@ multi_asset, sensor, ) -from dagster._time import datetime_from_timestamp, get_current_datetime +from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp from dagster_dbt import build_dbt_asset_specs from pydantic import BaseModel @@ -217,9 +217,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: "Link to Run": MarkdownMetadataValue( f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)" ), + "Creation Timestamp": TimestampMetadataValue( + get_current_timestamp() + ), }, ) - for spec in specs + for spec in toposort_flatten(specs) ] ) context.update_cursor(str(current_date.timestamp())) @@ -304,3 +307,34 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]: if key not in downstreams_per_key ] return leaf_specs + + +def toposort_flatten(specs: List[AssetSpec]) -> List[AssetSpec]: + asset_key_to_specs = {spec.key: spec for spec in specs} + downstreams_per_key: Dict[AssetKey, List[AssetKey]] = {} + starting_keys = [] # start with specs that have no deps + for spec in specs: + # each dep represents an upstream from spec.key to dep.key. So reverse the relationship + for dep in spec.deps: + downstreams_per_key[dep.asset_key] = downstreams_per_key.get(dep.asset_key, []) + [ + spec.key + ] + if not spec.deps: + starting_keys.append(spec.key) + + # topological sort + ordered_specs = [] + visited = set() + + def visit(key): + if key in visited: + return + visited.add(key) + for dep_key in downstreams_per_key.get(key, []): + visit(dep_key) + ordered_specs.append(asset_key_to_specs[key]) + + for key in starting_keys: + visit(key) + + return ordered_specs diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py index d0d90ab1c4bdf..0f880463334ad 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/airflow_project/dags/simple_dag.py @@ -18,4 +18,7 @@ def print_hello(): dag = DAG( "print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False ) -load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag) +downstream_print_op = PythonOperator( + task_id="downstream_print_task", python_callable=print_hello, dag=dag +) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py index ce47c1853dfc7..dff1a3f37883c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/test_airflow_utils.py @@ -25,10 +25,15 @@ def test_dag_peering( dag_id="print_dag", task_id="print_task", key=AssetKey(["some", "key"]), - ) + ), + TaskMapping( + dag_id="print_dag", + task_id="downstream_print_task", + key=AssetKey(["other", "key"]), + ), ], ) - assert len(assets_defs) == 2 + assert len(assets_defs) == 3 dag_def = [ # noqa assets_def for assets_def in assets_defs @@ -82,7 +87,7 @@ def test_dag_peering( sensor_context = build_sensor_context(instance=instance) sensor_result = sensor_def(sensor_context) assert isinstance(sensor_result, SensorResult) - assert len(sensor_result.asset_events) == 2 + assert len(sensor_result.asset_events) == 3 dag_mat = [ # noqa asset_mat for asset_mat in sensor_result.asset_events @@ -111,3 +116,16 @@ def test_dag_peering( assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue( f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)" ) + + other_mat = [ # noqa + asset_mat + for asset_mat in sensor_result.asset_events + if asset_mat.asset_key == AssetKey(["other", "key"]) + ][0] + + assert other_mat + + assert ( + other_mat.metadata["Creation Timestamp"].value + >= task_mat.metadata["Creation Timestamp"].value + )