Skip to content

Commit

Permalink
reduce the number of logs queried for run stats in base implementation (
Browse files Browse the repository at this point in the history
#26479)

## Summary & Motivation
This reduces the number of events fetched for the base implementation of
`get_run_stats`

## How I Tested These Changes
BK
  • Loading branch information
prha authored Dec 16, 2024
1 parent bf640e8 commit 388ae77
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
15 changes: 14 additions & 1 deletion python_modules/dagster/dagster/_core/execution/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

import dagster._check as check
from dagster._core.definitions import ExpectationResult
from dagster._core.events import MARKER_EVENTS, DagsterEventType, StepExpectationResultData
from dagster._core.events import (
MARKER_EVENTS,
PIPELINE_EVENTS,
DagsterEventType,
StepExpectationResultData,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
from dagster._serdes import whitelist_for_serdes

RUN_STATS_EVENT_TYPES = {
*PIPELINE_EVENTS,
DagsterEventType.STEP_FAILURE,
DagsterEventType.STEP_SUCCESS,
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.STEP_EXPECTATION_RESULT,
}

STEP_STATS_EVENT_TYPES = {
DagsterEventType.STEP_START,
DagsterEventType.STEP_FAILURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from dagster._core.events import DagsterEventType
from dagster._core.execution.stats import (
RUN_STATS_EVENT_TYPES,
STEP_STATS_EVENT_TYPES,
RunStepKeyStatsSnapshot,
build_run_stats_from_events,
Expand Down Expand Up @@ -255,7 +256,9 @@ def get_records_for_run(

def get_stats_for_run(self, run_id: str) -> DagsterRunStatsSnapshot:
"""Get a summary of events that have ocurred in a run."""
return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id))
return build_run_stats_from_events(
run_id, self.get_logs_for_run(run_id, of_type=RUN_STATS_EVENT_TYPES)
)

def get_step_stats_for_run(
self, run_id: str, step_keys: Optional[Sequence[str]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@
ASSET_CHECK_EVENTS,
ASSET_EVENTS,
EVENT_TYPE_TO_PIPELINE_RUN_STATUS,
MARKER_EVENTS,
DagsterEventType,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, build_run_step_stats_from_events
from dagster._core.execution.stats import (
RUN_STATS_EVENT_TYPES,
STEP_STATS_EVENT_TYPES,
RunStepKeyStatsSnapshot,
build_run_step_stats_from_events,
)
from dagster._core.storage.asset_check_execution_record import (
COMPLETED_ASSET_CHECK_EXECUTION_RECORD_STATUSES,
AssetCheckExecutionRecord,
Expand Down Expand Up @@ -574,7 +578,9 @@ def get_stats_for_run(self, run_id: str) -> DagsterRunStatsSnapshot:
.where(
db.and_(
SqlEventLogStorageTable.c.run_id == run_id,
SqlEventLogStorageTable.c.dagster_event_type != None, # noqa: E711
SqlEventLogStorageTable.c.dagster_event_type.in_(
[event_type.value for event_type in RUN_STATS_EVENT_TYPES]
),
)
)
.group_by("dagster_event_type")
Expand Down Expand Up @@ -647,18 +653,7 @@ def get_step_stats_for_run(
.where(SqlEventLogStorageTable.c.step_key != None) # noqa: E711
.where(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[
DagsterEventType.STEP_START.value,
DagsterEventType.STEP_SUCCESS.value,
DagsterEventType.STEP_SKIPPED.value,
DagsterEventType.STEP_FAILURE.value,
DagsterEventType.STEP_RESTARTED.value,
DagsterEventType.ASSET_MATERIALIZATION.value,
DagsterEventType.STEP_EXPECTATION_RESULT.value,
DagsterEventType.STEP_RESTARTED.value,
DagsterEventType.STEP_UP_FOR_RETRY.value,
]
+ [marker_event.value for marker_event in MARKER_EVENTS]
[event_type.value for event_type in STEP_STATS_EVENT_TYPES]
)
)
.order_by(SqlEventLogStorageTable.c.id.asc())
Expand Down

0 comments on commit 388ae77

Please sign in to comment.