Skip to content

Commit

Permalink
Fix materialization reporting order for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Jul 31, 2024
1 parent e61e2b4 commit 259d879
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
multi_asset,
sensor,
)
from dagster._time import datetime_from_timestamp, get_current_datetime
from dagster._core.utils import toposort_flatten
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
from dagster_dbt import build_dbt_asset_specs
from pydantic import BaseModel

Expand Down Expand Up @@ -217,9 +218,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
"Link to Run": MarkdownMetadataValue(
f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)"
),
"Creation Timestamp": TimestampMetadataValue(
get_current_timestamp()
),
},
)
for spec in specs
for spec in toposort_specs(specs)
]
)
context.update_cursor(str(current_date.timestamp()))
Expand Down Expand Up @@ -304,3 +308,13 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]:
if key not in downstreams_per_key
]
return leaf_specs


def toposort_specs(specs: List[AssetSpec]) -> List[AssetSpec]:
spec_per_key = {spec.key: spec for spec in specs}
return [
spec_per_key[key]
for key in toposort_flatten(
{spec.key: {dep.asset_key for dep in spec.deps} for spec in specs}
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ def print_hello():
dag = DAG(
"print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False
)
load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
downstream_print_op = PythonOperator(
task_id="downstream_print_task", python_callable=print_hello, dag=dag
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import cast

import requests
from dagster import JsonMetadataValue, MarkdownMetadataValue, SensorResult, build_sensor_context
from dagster import (
AssetDep,
JsonMetadataValue,
MarkdownMetadataValue,
SensorResult,
build_sensor_context,
)
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.test_utils import instance_for_test
from dagster_airlift import (
Expand All @@ -25,10 +31,16 @@ def test_dag_peering(
dag_id="print_dag",
task_id="print_task",
key=AssetKey(["some", "key"]),
)
),
TaskMapping(
dag_id="print_dag",
task_id="downstream_print_task",
key=AssetKey(["other", "key"]),
deps=[AssetDep(AssetKey(["some", "key"]))],
),
],
)
assert len(assets_defs) == 2
assert len(assets_defs) == 3
dag_def = [ # noqa
assets_def
for assets_def in assets_defs
Expand Down Expand Up @@ -82,7 +94,7 @@ def test_dag_peering(
sensor_context = build_sensor_context(instance=instance)
sensor_result = sensor_def(sensor_context)
assert isinstance(sensor_result, SensorResult)
assert len(sensor_result.asset_events) == 2
assert len(sensor_result.asset_events) == 3
dag_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
Expand Down Expand Up @@ -111,3 +123,16 @@ def test_dag_peering(
assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue(
f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)"
)

other_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
if asset_mat.asset_key == AssetKey(["other", "key"])
][0]

assert other_mat
# other mat should be downstream of task mat
assert ( # type: ignore
other_mat.metadata["Creation Timestamp"].value
>= task_mat.metadata["Creation Timestamp"].value
)

0 comments on commit 259d879

Please sign in to comment.