Skip to content

Commit

Permalink
Fix materialization reporting order for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Jul 29, 2024
1 parent eec0f12 commit b853b76
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
multi_asset,
sensor,
)
from dagster._time import datetime_from_timestamp, get_current_datetime
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
from dagster_dbt import build_dbt_asset_specs
from pydantic import BaseModel

Expand Down Expand Up @@ -217,9 +217,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
"Link to Run": MarkdownMetadataValue(
f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)"
),
"Creation Timestamp": TimestampMetadataValue(
get_current_timestamp()
),
},
)
for spec in specs
for spec in toposort_flatten(specs)
]
)
context.update_cursor(str(current_date.timestamp()))
Expand Down Expand Up @@ -304,3 +307,34 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]:
if key not in downstreams_per_key
]
return leaf_specs


def toposort_flatten(specs: List[AssetSpec]) -> List[AssetSpec]:
asset_key_to_specs = {spec.key: spec for spec in specs}
downstreams_per_key: Dict[AssetKey, List[AssetKey]] = {}
starting_keys = [] # start with specs that have no deps
for spec in specs:
# each dep represents an upstream from spec.key to dep.key. So reverse the relationship
for dep in spec.deps:
downstreams_per_key[dep.asset_key] = downstreams_per_key.get(dep.asset_key, []) + [
spec.key
]
if not spec.deps:
starting_keys.append(spec.key)

# topological sort
ordered_specs = []
visited = set()

def visit(key):
if key in visited:
return
visited.add(key)
for dep_key in downstreams_per_key.get(key, []):
visit(dep_key)
ordered_specs.append(asset_key_to_specs[key])

for key in starting_keys:
visit(key)

return ordered_specs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ def print_hello():
dag = DAG(
"print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False
)
load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
downstream_print_op = PythonOperator(
task_id="downstream_print_task", python_callable=print_hello, dag=dag
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ def test_dag_peering(
dag_id="print_dag",
task_id="print_task",
key=AssetKey(["some", "key"]),
)
),
TaskMapping(
dag_id="print_dag",
task_id="downstream_print_task",
key=AssetKey(["other", "key"]),
),
],
)
assert len(assets_defs) == 2
assert len(assets_defs) == 3
dag_def = [ # noqa
assets_def
for assets_def in assets_defs
Expand Down Expand Up @@ -82,7 +87,7 @@ def test_dag_peering(
sensor_context = build_sensor_context(instance=instance)
sensor_result = sensor_def(sensor_context)
assert isinstance(sensor_result, SensorResult)
assert len(sensor_result.asset_events) == 2
assert len(sensor_result.asset_events) == 3
dag_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
Expand Down Expand Up @@ -111,3 +116,16 @@ def test_dag_peering(
assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue(
f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)"
)

other_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
if asset_mat.asset_key == AssetKey(["other", "key"])
][0]

assert other_mat

assert (
other_mat.metadata["Creation Timestamp"].value
>= task_mat.metadata["Creation Timestamp"].value
)

0 comments on commit b853b76

Please sign in to comment.