Skip to content

Commit

Permalink
add support for incremental step stats calculations (#26577)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This is the counterpart of
#26550, but for step stats
instead of run stats.

The step stats calculations are a little more complicated because it
relies on intermediate state in between dagster events to be held. This
includes the `previous_attempt_start` time for a given step. This PR
introduces a field in the snapshot to carry over this state, which
should only be used in the incremental calculation.

Another change made here is that marker events (e.g. resource
init/teardown) can be associated with a step, but generally occur before
the step has started. That means that previously step snapshots did not
include these marker events if the step had not yet started. This PR
changes that behavior so that every step will have a snapshot if it has
a marker event, even if it has not started. The output for incremental
vs completed run snapshots should match exactly.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Dec 20, 2024
1 parent e7533d6 commit a14cb87
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 126 deletions.
316 changes: 192 additions & 124 deletions python_modules/dagster/dagster/_core/execution/stats.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, Iterable, NamedTuple, Optional, Sequence, cast
from typing import Any, Dict, Iterable, Mapping, Optional, Sequence, cast

import dagster._check as check
from dagster._core.definitions import ExpectationResult
Expand All @@ -12,6 +12,7 @@
)
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
from dagster._record import IHaveNew, record, record_custom
from dagster._serdes import whitelist_for_serdes

RUN_STATS_EVENT_TYPES = {
Expand Down Expand Up @@ -47,8 +48,8 @@ def build_run_stats_from_events(
raise check.ParameterCheckError(
"Invariant violation for parameter 'records'. Description: Expected iterable."
) from exc
for i, record in enumerate(entries):
check.inst_param(record, f"records[{i}]", EventLogEntry)
for i, entry in enumerate(entries):
check.inst_param(entry, f"entries[{i}]", EventLogEntry)

if previous_stats:
steps_succeeded = previous_stats.steps_succeeded
Expand Down Expand Up @@ -115,14 +116,154 @@ class StepEventStatus(Enum):
IN_PROGRESS = "IN_PROGRESS"


@whitelist_for_serdes
@record_custom
class RunStepMarker(IHaveNew):
start_time: Optional[float]
end_time: Optional[float]
key: Optional[str]

def __new__(
cls,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
key: Optional[str] = None,
):
return super().__new__(
cls,
start_time=check.opt_float_param(start_time, "start_time"),
end_time=check.opt_float_param(end_time, "end_time"),
key=check.opt_str_param(key, "key"),
)


@whitelist_for_serdes
@record_custom
class RunStepKeyStatsSnapshot(IHaveNew):
run_id: str
step_key: str
status: Optional[StepEventStatus]
start_time: Optional[float]
end_time: Optional[float]
materialization_events: Sequence[EventLogEntry]
expectation_results: Sequence[ExpectationResult]
attempts: Optional[int]
attempts_list: Sequence[RunStepMarker]
markers: Sequence[RunStepMarker]
partial_attempt_start: Optional[float]

def __new__(
cls,
run_id: str,
step_key: str,
status: Optional[StepEventStatus] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
materialization_events: Optional[Sequence[EventLogEntry]] = None,
expectation_results: Optional[Sequence[ExpectationResult]] = None,
attempts: Optional[int] = None,
attempts_list: Optional[Sequence[RunStepMarker]] = None,
markers: Optional[Sequence[RunStepMarker]] = None,
partial_attempt_start: Optional[float] = None,
):
return super().__new__(
cls,
run_id=check.str_param(run_id, "run_id"),
step_key=check.str_param(step_key, "step_key"),
status=check.opt_inst_param(status, "status", StepEventStatus),
start_time=check.opt_float_param(start_time, "start_time"),
end_time=check.opt_float_param(end_time, "end_time"),
materialization_events=check.opt_sequence_param(
materialization_events,
"materialization_events",
EventLogEntry,
),
expectation_results=check.opt_sequence_param(
expectation_results, "expectation_results", ExpectationResult
),
attempts=check.opt_int_param(attempts, "attempts"),
attempts_list=check.opt_sequence_param(attempts_list, "attempts_list", RunStepMarker),
markers=check.opt_sequence_param(markers, "markers", RunStepMarker),
# used to calculate incremental step stats using batches of event logs
partial_attempt_start=check.opt_float_param(
partial_attempt_start, "partial_attempt_start"
),
)


@whitelist_for_serdes
@record
class RunStepStatsSnapshot:
run_id: str
step_key_stats: Sequence[RunStepKeyStatsSnapshot]
partial_markers: Optional[Mapping[str, Sequence[RunStepMarker]]]


def build_run_step_stats_from_events(
run_id: str, records: Iterable[EventLogEntry]
) -> Sequence["RunStepKeyStatsSnapshot"]:
run_id: str,
entries: Iterable[EventLogEntry],
) -> Sequence[RunStepKeyStatsSnapshot]:
snapshot = build_run_step_stats_snapshot_from_events(run_id, entries)
return snapshot.step_key_stats


def build_run_step_stats_snapshot_from_events(
run_id: str,
entries: Iterable[EventLogEntry],
previous_snapshot: Optional["RunStepStatsSnapshot"] = None,
) -> "RunStepStatsSnapshot":
by_step_key: Dict[str, Dict[str, Any]] = defaultdict(dict)
attempts = defaultdict(list)
attempt_events = defaultdict(list)
markers: Dict[str, Dict[str, Any]] = defaultdict(dict)
for event in records:

if previous_snapshot:
for step_stats in previous_snapshot.step_key_stats:
check.invariant(step_stats.run_id == run_id)
by_step_key[step_stats.step_key] = {
"start_time": step_stats.start_time,
"end_time": step_stats.end_time,
"status": step_stats.status,
"materialization_events": step_stats.materialization_events,
"expectation_results": step_stats.expectation_results,
"attempts": step_stats.attempts,
"partial_attempt_start": step_stats.partial_attempt_start,
}
for attempt in step_stats.attempts_list:
attempts[step_stats.step_key].append(attempt)

for marker in step_stats.markers:
assert marker.key
markers[step_stats.step_key][marker.key] = {
"key": marker.key,
"start": marker.start_time,
"end": marker.end_time,
}

# handle the partial markers
if previous_snapshot.partial_markers:
for step_key, partial_markers in previous_snapshot.partial_markers.items():
for marker in partial_markers:
assert marker.key
markers[step_key][marker.key] = {
"key": marker.key,
"start": marker.start_time,
"end": marker.end_time,
}

def _open_attempt(step_key: str, event: EventLogEntry) -> None:
by_step_key[step_key]["attempts"] = int(by_step_key[step_key].get("attempts") or 0) + 1
by_step_key[step_key]["partial_attempt_start"] = event.timestamp

def _close_attempt(step_key: str, event: EventLogEntry) -> None:
attempts[step_key].append(
RunStepMarker(
start_time=by_step_key[step_key].get("partial_attempt_start"),
end_time=event.timestamp,
)
)
by_step_key[step_key]["partial_attempt_start"] = None

for event in entries:
if not event.is_dagster_event:
continue
dagster_event = event.get_dagster_event()
Expand All @@ -135,19 +276,25 @@ def build_run_step_stats_from_events(
continue

if dagster_event.event_type == DagsterEventType.STEP_START:
by_step_key[step_key]["status"] = StepEventStatus.IN_PROGRESS
by_step_key[step_key]["start_time"] = event.timestamp
by_step_key[step_key]["attempts"] = 1
_open_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.STEP_RESTARTED:
_open_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.STEP_UP_FOR_RETRY:
_close_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.STEP_FAILURE:
by_step_key[step_key]["end_time"] = event.timestamp
by_step_key[step_key]["status"] = StepEventStatus.FAILURE
if dagster_event.event_type == DagsterEventType.STEP_RESTARTED:
by_step_key[step_key]["attempts"] = int(by_step_key[step_key].get("attempts") or 0) + 1
_close_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.STEP_SUCCESS:
by_step_key[step_key]["end_time"] = event.timestamp
by_step_key[step_key]["status"] = StepEventStatus.SUCCESS
_close_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.STEP_SKIPPED:
by_step_key[step_key]["end_time"] = event.timestamp
by_step_key[step_key]["status"] = StepEventStatus.SKIPPED
_close_attempt(step_key, event)
if dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION:
materialization_events = by_step_key[step_key].get("materialization_events", [])
materialization_events.append(event)
Expand All @@ -158,129 +305,50 @@ def build_run_step_stats_from_events(
step_expectation_results = by_step_key[step_key].get("expectation_results", [])
step_expectation_results.append(expectation_result)
by_step_key[step_key]["expectation_results"] = step_expectation_results
if dagster_event.event_type in (
DagsterEventType.STEP_UP_FOR_RETRY,
DagsterEventType.STEP_RESTARTED,
):
attempt_events[step_key].append(event)

if dagster_event.event_type in MARKER_EVENTS:
if dagster_event.engine_event_data.marker_start:
key = dagster_event.engine_event_data.marker_start
if key not in markers[step_key]:
markers[step_key][key] = {"key": key, "start": event.timestamp}
marker_key = dagster_event.engine_event_data.marker_start
if marker_key not in markers[step_key]:
markers[step_key][marker_key] = {"key": marker_key, "start": event.timestamp}
else:
markers[step_key][key]["start"] = event.timestamp
markers[step_key][marker_key]["start"] = event.timestamp

if dagster_event.engine_event_data.marker_end:
key = dagster_event.engine_event_data.marker_end
if key not in markers[step_key]:
markers[step_key][key] = {"key": key, "end": event.timestamp}
marker_key = dagster_event.engine_event_data.marker_end
if marker_key not in markers[step_key]:
markers[step_key][marker_key] = {"key": marker_key, "end": event.timestamp}
else:
markers[step_key][key]["end"] = event.timestamp
markers[step_key][marker_key]["end"] = event.timestamp

snapshots = []
for step_key, step_stats in by_step_key.items():
events = attempt_events[step_key]
step_attempts = []
attempt_start = step_stats.get("start_time")

for event in events:
if not event.dagster_event:
continue
if event.dagster_event.event_type == DagsterEventType.STEP_UP_FOR_RETRY:
step_attempts.append(
RunStepMarker(start_time=attempt_start, end_time=event.timestamp)
)
elif event.dagster_event.event_type == DagsterEventType.STEP_RESTARTED:
attempt_start = event.timestamp
if step_stats.get("end_time"):
step_attempts.append(
RunStepMarker(start_time=attempt_start, end_time=step_stats["end_time"])
snapshots.append(
RunStepKeyStatsSnapshot(
run_id=run_id,
step_key=step_key,
**step_stats,
markers=[
RunStepMarker(
start_time=marker.get("start"),
end_time=marker.get("end"),
key=marker.get("key"),
)
for marker in markers[step_key].values()
],
attempts_list=attempts[step_key],
)
else:
by_step_key[step_key]["status"] = StepEventStatus.IN_PROGRESS
attempts[step_key] = step_attempts

return [
RunStepKeyStatsSnapshot(
run_id=run_id,
step_key=step_key,
attempts_list=attempts[step_key],
markers=[
RunStepMarker(start_time=marker.get("start"), end_time=marker.get("end"))
for marker in markers[step_key].values()
],
**value,
)
for step_key, value in by_step_key.items()
]


@whitelist_for_serdes
class RunStepMarker(
NamedTuple(
"_RunStepMarker",
[("start_time", Optional[float]), ("end_time", Optional[float])],
)
):
def __new__(
cls,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
):
return super(RunStepMarker, cls).__new__(
cls,
start_time=check.opt_float_param(start_time, "start_time"),
end_time=check.opt_float_param(end_time, "end_time"),
)


@whitelist_for_serdes
class RunStepKeyStatsSnapshot(
NamedTuple(
"_RunStepKeyStatsSnapshot",
[
("run_id", str),
("step_key", str),
("status", Optional[StepEventStatus]),
("start_time", Optional[float]),
("end_time", Optional[float]),
("materialization_events", Sequence[EventLogEntry]),
("expectation_results", Sequence[ExpectationResult]),
("attempts", Optional[int]),
("attempts_list", Sequence[RunStepMarker]),
("markers", Sequence[RunStepMarker]),
],
return RunStepStatsSnapshot(
run_id=run_id,
step_key_stats=snapshots,
partial_markers={
step_key: [
RunStepMarker(start_time=marker.get("start"), end_time=marker.get("end"), key=key)
for key, marker in markers.items()
]
for step_key, markers in markers.items()
if step_key not in by_step_key
},
)
):
def __new__(
cls,
run_id: str,
step_key: str,
status: Optional[StepEventStatus] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
materialization_events: Optional[Sequence[EventLogEntry]] = None,
expectation_results: Optional[Sequence[ExpectationResult]] = None,
attempts: Optional[int] = None,
attempts_list: Optional[Sequence[RunStepMarker]] = None,
markers: Optional[Sequence[RunStepMarker]] = None,
):
return super(RunStepKeyStatsSnapshot, cls).__new__(
cls,
run_id=check.str_param(run_id, "run_id"),
step_key=check.str_param(step_key, "step_key"),
status=check.opt_inst_param(status, "status", StepEventStatus),
start_time=check.opt_float_param(start_time, "start_time"),
end_time=check.opt_float_param(end_time, "end_time"),
materialization_events=check.opt_sequence_param(
materialization_events,
"materialization_events",
EventLogEntry,
),
expectation_results=check.opt_sequence_param(
expectation_results, "expectation_results", ExpectationResult
),
attempts=check.opt_int_param(attempts, "attempts"),
attempts_list=check.opt_sequence_param(attempts_list, "attempts_list", RunStepMarker),
markers=check.opt_sequence_param(markers, "markers", RunStepMarker),
)
Loading

0 comments on commit a14cb87

Please sign in to comment.