From a14cb87a2caa69ce10d27674e7de518721bc5cfd Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:41:32 -0800 Subject: [PATCH] add support for incremental step stats calculations (#26577) ## Summary & Motivation This is the counterpart of https://github.com/dagster-io/dagster/pull/26550, but for step stats instead of run stats. The step stats calculations are a little more complicated because it relies on intermediate state in between dagster events to be held. This includes the `previous_attempt_start` time for a given step. This PR introduces a field in the snapshot to carry over this state, which should only be used in the incremental calculation. Another change made here is that marker events (e.g. resource init/teardown) can be associated with a step, but generally occur before the step has started. That means that previously step snapshots did not include these marker events if the step had not yet started. This PR changes that behavior so that every step will have a snapshot if it has a marker event, even if it has not started. The output for incremental vs completed run snapshots should match exactly. ## How I Tested These Changes BK --- .../dagster/dagster/_core/execution/stats.py | 316 +++++++++++------- .../storage_tests/test_event_log.py | 55 ++- 2 files changed, 245 insertions(+), 126 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/stats.py b/python_modules/dagster/dagster/_core/execution/stats.py index 0cea0fc4a0a6f..bced58da9aa75 100644 --- a/python_modules/dagster/dagster/_core/execution/stats.py +++ b/python_modules/dagster/dagster/_core/execution/stats.py @@ -1,6 +1,6 @@ from collections import defaultdict from enum import Enum -from typing import Any, Dict, Iterable, NamedTuple, Optional, Sequence, cast +from typing import Any, Dict, Iterable, Mapping, Optional, Sequence, cast import dagster._check as check from dagster._core.definitions import ExpectationResult @@ -12,6 +12,7 @@ ) from dagster._core.events.log import EventLogEntry from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot +from dagster._record import IHaveNew, record, record_custom from dagster._serdes import whitelist_for_serdes RUN_STATS_EVENT_TYPES = { @@ -47,8 +48,8 @@ def build_run_stats_from_events( raise check.ParameterCheckError( "Invariant violation for parameter 'records'. Description: Expected iterable." ) from exc - for i, record in enumerate(entries): - check.inst_param(record, f"records[{i}]", EventLogEntry) + for i, entry in enumerate(entries): + check.inst_param(entry, f"entries[{i}]", EventLogEntry) if previous_stats: steps_succeeded = previous_stats.steps_succeeded @@ -115,14 +116,154 @@ class StepEventStatus(Enum): IN_PROGRESS = "IN_PROGRESS" +@whitelist_for_serdes +@record_custom +class RunStepMarker(IHaveNew): + start_time: Optional[float] + end_time: Optional[float] + key: Optional[str] + + def __new__( + cls, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + key: Optional[str] = None, + ): + return super().__new__( + cls, + start_time=check.opt_float_param(start_time, "start_time"), + end_time=check.opt_float_param(end_time, "end_time"), + key=check.opt_str_param(key, "key"), + ) + + +@whitelist_for_serdes +@record_custom +class RunStepKeyStatsSnapshot(IHaveNew): + run_id: str + step_key: str + status: Optional[StepEventStatus] + start_time: Optional[float] + end_time: Optional[float] + materialization_events: Sequence[EventLogEntry] + expectation_results: Sequence[ExpectationResult] + attempts: Optional[int] + attempts_list: Sequence[RunStepMarker] + markers: Sequence[RunStepMarker] + partial_attempt_start: Optional[float] + + def __new__( + cls, + run_id: str, + step_key: str, + status: Optional[StepEventStatus] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + materialization_events: Optional[Sequence[EventLogEntry]] = None, + expectation_results: Optional[Sequence[ExpectationResult]] = None, + attempts: Optional[int] = None, + attempts_list: Optional[Sequence[RunStepMarker]] = None, + markers: Optional[Sequence[RunStepMarker]] = None, + partial_attempt_start: Optional[float] = None, + ): + return super().__new__( + cls, + run_id=check.str_param(run_id, "run_id"), + step_key=check.str_param(step_key, "step_key"), + status=check.opt_inst_param(status, "status", StepEventStatus), + start_time=check.opt_float_param(start_time, "start_time"), + end_time=check.opt_float_param(end_time, "end_time"), + materialization_events=check.opt_sequence_param( + materialization_events, + "materialization_events", + EventLogEntry, + ), + expectation_results=check.opt_sequence_param( + expectation_results, "expectation_results", ExpectationResult + ), + attempts=check.opt_int_param(attempts, "attempts"), + attempts_list=check.opt_sequence_param(attempts_list, "attempts_list", RunStepMarker), + markers=check.opt_sequence_param(markers, "markers", RunStepMarker), + # used to calculate incremental step stats using batches of event logs + partial_attempt_start=check.opt_float_param( + partial_attempt_start, "partial_attempt_start" + ), + ) + + +@whitelist_for_serdes +@record +class RunStepStatsSnapshot: + run_id: str + step_key_stats: Sequence[RunStepKeyStatsSnapshot] + partial_markers: Optional[Mapping[str, Sequence[RunStepMarker]]] + + def build_run_step_stats_from_events( - run_id: str, records: Iterable[EventLogEntry] -) -> Sequence["RunStepKeyStatsSnapshot"]: + run_id: str, + entries: Iterable[EventLogEntry], +) -> Sequence[RunStepKeyStatsSnapshot]: + snapshot = build_run_step_stats_snapshot_from_events(run_id, entries) + return snapshot.step_key_stats + + +def build_run_step_stats_snapshot_from_events( + run_id: str, + entries: Iterable[EventLogEntry], + previous_snapshot: Optional["RunStepStatsSnapshot"] = None, +) -> "RunStepStatsSnapshot": by_step_key: Dict[str, Dict[str, Any]] = defaultdict(dict) attempts = defaultdict(list) - attempt_events = defaultdict(list) markers: Dict[str, Dict[str, Any]] = defaultdict(dict) - for event in records: + + if previous_snapshot: + for step_stats in previous_snapshot.step_key_stats: + check.invariant(step_stats.run_id == run_id) + by_step_key[step_stats.step_key] = { + "start_time": step_stats.start_time, + "end_time": step_stats.end_time, + "status": step_stats.status, + "materialization_events": step_stats.materialization_events, + "expectation_results": step_stats.expectation_results, + "attempts": step_stats.attempts, + "partial_attempt_start": step_stats.partial_attempt_start, + } + for attempt in step_stats.attempts_list: + attempts[step_stats.step_key].append(attempt) + + for marker in step_stats.markers: + assert marker.key + markers[step_stats.step_key][marker.key] = { + "key": marker.key, + "start": marker.start_time, + "end": marker.end_time, + } + + # handle the partial markers + if previous_snapshot.partial_markers: + for step_key, partial_markers in previous_snapshot.partial_markers.items(): + for marker in partial_markers: + assert marker.key + markers[step_key][marker.key] = { + "key": marker.key, + "start": marker.start_time, + "end": marker.end_time, + } + + def _open_attempt(step_key: str, event: EventLogEntry) -> None: + by_step_key[step_key]["attempts"] = int(by_step_key[step_key].get("attempts") or 0) + 1 + by_step_key[step_key]["partial_attempt_start"] = event.timestamp + + def _close_attempt(step_key: str, event: EventLogEntry) -> None: + attempts[step_key].append( + RunStepMarker( + start_time=by_step_key[step_key].get("partial_attempt_start"), + end_time=event.timestamp, + ) + ) + by_step_key[step_key]["partial_attempt_start"] = None + + for event in entries: if not event.is_dagster_event: continue dagster_event = event.get_dagster_event() @@ -135,19 +276,25 @@ def build_run_step_stats_from_events( continue if dagster_event.event_type == DagsterEventType.STEP_START: + by_step_key[step_key]["status"] = StepEventStatus.IN_PROGRESS by_step_key[step_key]["start_time"] = event.timestamp - by_step_key[step_key]["attempts"] = 1 + _open_attempt(step_key, event) + if dagster_event.event_type == DagsterEventType.STEP_RESTARTED: + _open_attempt(step_key, event) + if dagster_event.event_type == DagsterEventType.STEP_UP_FOR_RETRY: + _close_attempt(step_key, event) if dagster_event.event_type == DagsterEventType.STEP_FAILURE: by_step_key[step_key]["end_time"] = event.timestamp by_step_key[step_key]["status"] = StepEventStatus.FAILURE - if dagster_event.event_type == DagsterEventType.STEP_RESTARTED: - by_step_key[step_key]["attempts"] = int(by_step_key[step_key].get("attempts") or 0) + 1 + _close_attempt(step_key, event) if dagster_event.event_type == DagsterEventType.STEP_SUCCESS: by_step_key[step_key]["end_time"] = event.timestamp by_step_key[step_key]["status"] = StepEventStatus.SUCCESS + _close_attempt(step_key, event) if dagster_event.event_type == DagsterEventType.STEP_SKIPPED: by_step_key[step_key]["end_time"] = event.timestamp by_step_key[step_key]["status"] = StepEventStatus.SKIPPED + _close_attempt(step_key, event) if dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION: materialization_events = by_step_key[step_key].get("materialization_events", []) materialization_events.append(event) @@ -158,129 +305,50 @@ def build_run_step_stats_from_events( step_expectation_results = by_step_key[step_key].get("expectation_results", []) step_expectation_results.append(expectation_result) by_step_key[step_key]["expectation_results"] = step_expectation_results - if dagster_event.event_type in ( - DagsterEventType.STEP_UP_FOR_RETRY, - DagsterEventType.STEP_RESTARTED, - ): - attempt_events[step_key].append(event) + if dagster_event.event_type in MARKER_EVENTS: if dagster_event.engine_event_data.marker_start: - key = dagster_event.engine_event_data.marker_start - if key not in markers[step_key]: - markers[step_key][key] = {"key": key, "start": event.timestamp} + marker_key = dagster_event.engine_event_data.marker_start + if marker_key not in markers[step_key]: + markers[step_key][marker_key] = {"key": marker_key, "start": event.timestamp} else: - markers[step_key][key]["start"] = event.timestamp + markers[step_key][marker_key]["start"] = event.timestamp if dagster_event.engine_event_data.marker_end: - key = dagster_event.engine_event_data.marker_end - if key not in markers[step_key]: - markers[step_key][key] = {"key": key, "end": event.timestamp} + marker_key = dagster_event.engine_event_data.marker_end + if marker_key not in markers[step_key]: + markers[step_key][marker_key] = {"key": marker_key, "end": event.timestamp} else: - markers[step_key][key]["end"] = event.timestamp + markers[step_key][marker_key]["end"] = event.timestamp + snapshots = [] for step_key, step_stats in by_step_key.items(): - events = attempt_events[step_key] - step_attempts = [] - attempt_start = step_stats.get("start_time") - - for event in events: - if not event.dagster_event: - continue - if event.dagster_event.event_type == DagsterEventType.STEP_UP_FOR_RETRY: - step_attempts.append( - RunStepMarker(start_time=attempt_start, end_time=event.timestamp) - ) - elif event.dagster_event.event_type == DagsterEventType.STEP_RESTARTED: - attempt_start = event.timestamp - if step_stats.get("end_time"): - step_attempts.append( - RunStepMarker(start_time=attempt_start, end_time=step_stats["end_time"]) + snapshots.append( + RunStepKeyStatsSnapshot( + run_id=run_id, + step_key=step_key, + **step_stats, + markers=[ + RunStepMarker( + start_time=marker.get("start"), + end_time=marker.get("end"), + key=marker.get("key"), + ) + for marker in markers[step_key].values() + ], + attempts_list=attempts[step_key], ) - else: - by_step_key[step_key]["status"] = StepEventStatus.IN_PROGRESS - attempts[step_key] = step_attempts - - return [ - RunStepKeyStatsSnapshot( - run_id=run_id, - step_key=step_key, - attempts_list=attempts[step_key], - markers=[ - RunStepMarker(start_time=marker.get("start"), end_time=marker.get("end")) - for marker in markers[step_key].values() - ], - **value, - ) - for step_key, value in by_step_key.items() - ] - - -@whitelist_for_serdes -class RunStepMarker( - NamedTuple( - "_RunStepMarker", - [("start_time", Optional[float]), ("end_time", Optional[float])], - ) -): - def __new__( - cls, - start_time: Optional[float] = None, - end_time: Optional[float] = None, - ): - return super(RunStepMarker, cls).__new__( - cls, - start_time=check.opt_float_param(start_time, "start_time"), - end_time=check.opt_float_param(end_time, "end_time"), ) - -@whitelist_for_serdes -class RunStepKeyStatsSnapshot( - NamedTuple( - "_RunStepKeyStatsSnapshot", - [ - ("run_id", str), - ("step_key", str), - ("status", Optional[StepEventStatus]), - ("start_time", Optional[float]), - ("end_time", Optional[float]), - ("materialization_events", Sequence[EventLogEntry]), - ("expectation_results", Sequence[ExpectationResult]), - ("attempts", Optional[int]), - ("attempts_list", Sequence[RunStepMarker]), - ("markers", Sequence[RunStepMarker]), - ], + return RunStepStatsSnapshot( + run_id=run_id, + step_key_stats=snapshots, + partial_markers={ + step_key: [ + RunStepMarker(start_time=marker.get("start"), end_time=marker.get("end"), key=key) + for key, marker in markers.items() + ] + for step_key, markers in markers.items() + if step_key not in by_step_key + }, ) -): - def __new__( - cls, - run_id: str, - step_key: str, - status: Optional[StepEventStatus] = None, - start_time: Optional[float] = None, - end_time: Optional[float] = None, - materialization_events: Optional[Sequence[EventLogEntry]] = None, - expectation_results: Optional[Sequence[ExpectationResult]] = None, - attempts: Optional[int] = None, - attempts_list: Optional[Sequence[RunStepMarker]] = None, - markers: Optional[Sequence[RunStepMarker]] = None, - ): - return super(RunStepKeyStatsSnapshot, cls).__new__( - cls, - run_id=check.str_param(run_id, "run_id"), - step_key=check.str_param(step_key, "step_key"), - status=check.opt_inst_param(status, "status", StepEventStatus), - start_time=check.opt_float_param(start_time, "start_time"), - end_time=check.opt_float_param(end_time, "end_time"), - materialization_events=check.opt_sequence_param( - materialization_events, - "materialization_events", - EventLogEntry, - ), - expectation_results=check.opt_sequence_param( - expectation_results, "expectation_results", ExpectationResult - ), - attempts=check.opt_int_param(attempts, "attempts"), - attempts_list=check.opt_sequence_param(attempts_list, "attempts_list", RunStepMarker), - markers=check.opt_sequence_param(markers, "markers", RunStepMarker), - ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py b/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py index 8f1d0cc194756..8db0dc8f8b240 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py @@ -9,9 +9,14 @@ import pytest import sqlalchemy import sqlalchemy as db -from dagster import AssetKey, AssetMaterialization, DagsterInstance, Output, op +from dagster import AssetKey, AssetMaterialization, DagsterInstance, Out, Output, RetryRequested, op from dagster._core.errors import DagsterEventLogInvalidForRun -from dagster._core.execution.stats import build_run_stats_from_events +from dagster._core.execution.stats import ( + StepEventStatus, + build_run_stats_from_events, + build_run_step_stats_from_events, + build_run_step_stats_snapshot_from_events, +) from dagster._core.storage.event_log import ( ConsolidatedSqliteEventLogStorage, SqlEventLogStorageMetadata, @@ -332,3 +337,49 @@ def _ops(): ) assert incremental_run_stats == run_stats + + +def test_step_stats(): + @op + def op_success(_): + return 1 + + @op + def asset_op(_): + yield AssetMaterialization(asset_key=AssetKey("asset_1")) + yield Output(1) + + @op(out=Out(str)) + def op_failure(_): + time.sleep(0.001) + raise RetryRequested(max_retries=3) + + def _ops(): + op_success() + asset_op() + op_failure() + + events, result = _synthesize_events(_ops, check_success=False) + + step_stats = build_run_step_stats_from_events(result.run_id, events) + assert len(step_stats) == 3 + assert len([step for step in step_stats if step.status == StepEventStatus.SUCCESS]) == 2 + assert len([step for step in step_stats if step.status == StepEventStatus.FAILURE]) == 1 + assert all([step.run_id == result.run_id for step in step_stats]) + + op_failure_stats = next( + iter([step for step in step_stats if step.step_key == "op_failure"]), None + ) + assert op_failure_stats + assert op_failure_stats.attempts == 4 + assert len(op_failure_stats.attempts_list) == 4 + + # build up run stats through incremental events + incremental_snapshot = None + for event in events: + incremental_snapshot = build_run_step_stats_snapshot_from_events( + result.run_id, [event], incremental_snapshot + ) + + assert incremental_snapshot + assert incremental_snapshot.step_key_stats == step_stats