Skip to content

Commit

Permalink
Count memory usage for grandchildren in job_dispatch
Browse files Browse the repository at this point in the history
job_dispatch/job_runner only picked up the memory usage for its direct
child. When the direct child is only a wrapper for some potentially
memory hungry processes, the memory usage reported from job_dispatch is
essentially garbage.

This PR lets the job runner scan through all its grandchildren when
estimating memory usage for a started forward model.

NB: This counts RSS (resident set size). When summing over multiple
processes, the RSS sum will count shared memory pages multiple times,
thus the sum is likely to be an overestimate for the actual memory load
imposed. This is probably less of a problem, as the target memory
consumption numbers that are of interest, stem from big datasets which
are not shared among processes.
  • Loading branch information
berland committed Sep 26, 2023
1 parent d702363 commit 5c24b07
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/_ert_job_runner/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def ensure_file_handles_closed():
max_memory_usage = 0
while exit_code is None:
try:
memory = process.memory_info().rss
memory = process.memory_info().rss + sum(
child.memory_info().rss
for child in process.children(recursive=True)
)
except (NoSuchProcess, AccessDenied, ZombieProcess):
# In case of a process that has died and is in some transitional
# state, we ignore any failures. Only seen on OSX thus far.
Expand Down
41 changes: 41 additions & 0 deletions tests/unit_tests/job_runner/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,47 @@ def test_run_with_process_failing(
next(run)


@pytest.mark.usefixtures("use_tmpdir")
def test_memory_usage_counts_grandchildren():
scriptname = "recursive_memory_hog.py"
with open(scriptname, "w", encoding="utf-8") as script:
script.write(
"""#!/usr/bin/env python
import os
import sys
import time
counter = int(sys.argv[1])
numbers = list(range(int(1e6)))
if counter > 0:
parent = os.fork()
if not parent:
os.execv(sys.argv[0], [sys.argv[0], str(counter - 1)])
time.sleep(0.3)""" # Too low sleep will make the test faster but flaky
)
executable = os.path.realpath(scriptname)
os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

def max_memory_per_subprocess_layer(layers: int) -> int:
job = Job(
{
"executable": executable,
"argList": [str(layers)],
},
0,
)
job.MEMORY_POLL_PERIOD = 0.01
max_seen = 0
for status in job.run():
if isinstance(status, Running):
max_seen = max(max_seen, status.max_memory_usage)
return max_seen

max_seens = [max_memory_per_subprocess_layer(layers) for layers in range(3)]
assert max_seens[0] < max_seens[1]
assert max_seens[1] < max_seens[2]


@pytest.mark.usefixtures("use_tmpdir")
def test_run_fails_using_exit_bash_builtin():
job = Job(
Expand Down

0 comments on commit 5c24b07

Please sign in to comment.