Skip to content

Commit

Permalink
Log runtime for individual forward model steps
Browse files Browse the repository at this point in the history
There are more short forward model time steps than we want the logging
system to handle, thus steps taking less than 2 minutes are skipped.
  • Loading branch information
berland authored and sondreso committed Nov 1, 2024
1 parent 5a220c7 commit dd35db4
Showing 1 changed file with 44 additions and 1 deletion.
45 changes: 44 additions & 1 deletion src/ert/ensemble_evaluator/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
Union,
)

from _ert.events import Event, Id, event_from_dict, event_to_json
from _ert.events import (
Event,
FMEvent,
ForwardModelStepFailure,
ForwardModelStepSuccess,
Id,
event_from_dict,
event_to_json,
)
from _ert.forward_model_runner.client import Client
from ert.config import ForwardModelStep, QueueConfig
from ert.run_arg import RunArg
Expand Down Expand Up @@ -142,9 +150,44 @@ def _create_snapshot(self) -> EnsembleSnapshot:
def get_successful_realizations(self) -> List[int]:
return self.snapshot.get_successful_realizations()

def _log_completed_fm_step(
self, event: FMEvent, step_snapshot: Optional[FMStepSnapshot]
) -> None:
if step_snapshot is None:
logger.warning(f"Should log {event}, but there was no step_snapshot")
return
step_name = step_snapshot.get("name", "")
start_time = step_snapshot.get("start_time")
cpu_seconds = step_snapshot.get("cpu_seconds")
current_memory_usage = step_snapshot.get("current_memory_usage")
if start_time is not None and event.time is not None:
walltime = (event.time - start_time).total_seconds()
else:
# We get here if the Running event is in the same event batch as
# the Success event. That means that runtime is close to zero.
walltime = 0

if walltime > 120:
logger.warning(
f"{event.event_type} {step_name} "
f"{walltime=} "
f"{cpu_seconds=} "
f"{current_memory_usage=} "
f"step_index={event.fm_step} "
f"real={event.real} "
f"ensemble={event.ensemble}"
)

def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
snapshot_mutate_event = EnsembleSnapshot()
for event in events:
if isinstance(event, (ForwardModelStepSuccess, ForwardModelStepFailure)):
step = (
self.snapshot.reals[event.real]
.get("fm_steps", {})
.get(event.fm_step)
)
self._log_completed_fm_step(event, step)
snapshot_mutate_event = snapshot_mutate_event.update_from_event(
event, source_snapshot=self.snapshot
)
Expand Down

0 comments on commit dd35db4

Please sign in to comment.