Skip to content

Commit

Permalink
patch trace file
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavDhulipala committed Aug 16, 2024
1 parent 790e8f7 commit af870af
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions wrappers/proctrac.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/python3

# SPDX-FileCopyrightText: 2023 Rivos Inc.
#
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -8,16 +6,16 @@
import os
import requests
from time import sleep
import dataclasses
from typing import Generator
import argparse as ag
import json
import platform
from datetime import datetime

from dataclasses import dataclass, asdict
from pprint import pprint

# must correlate with trace info struct
@dataclasses.dataclass
@dataclass
class TraceInfo:
pid: int
cpus: float
Expand All @@ -29,6 +27,26 @@ class TraceInfo:
username: str = os.getenv("USER", "")
hostname: str = platform.node()

@classmethod
def from_proc(cls, jobid: int, proc: psutil.Popen) -> 'TraceInfo':
io_counters = proc.io_counters()
return cls(
pid=proc.pid,
cpus=proc.cpu_percent(0.1),
threads=proc.num_threads(),
mem=proc.memory_info().rss,
read_bytes=io_counters.read_bytes,
write_bytes=io_counters.write_bytes,
job_id=jobid,
)

def merge(self, trace_info: 'TraceInfo'):
self.cpus += trace_info.cpus
self.threads += trace_info.threads
self.mem += trace_info.mem
self.read_bytes += trace_info.read_bytes
self.write_bytes += trace_info.write_bytes


class ProcWrapper:
"""thin wrapper to send slurm proc metrics to our exporter"""
Expand All @@ -48,15 +66,12 @@ def __init__(self, cmd=[], sample_rate=0, jobid=0):

def poll_info(self) -> Generator[TraceInfo, None, None]:
while self.proc.poll() is None:
trace = TraceInfo(0, 0, 0, 0, 0, 0, self.jobid)
trace = TraceInfo.from_proc(self.jobid, self.proc)
start = datetime.now()
for p in self.proc.children(True):
try:
io_counters = p.io_counters()
trace.cpus += p.cpu_percent(0.1)
trace.threads += p.num_threads()
trace.write_bytes += io_counters.write_bytes
trace.read_bytes += io_counters.read_bytes
child_trace = TraceInfo.from_proc(self.jobid, p)
trace.merge(child_trace)
except psutil.Error as e:
print(f"failed to poll child process with error {e}")
continue
Expand Down Expand Up @@ -110,11 +125,11 @@ def poll_info(self) -> Generator[TraceInfo, None, None]:
wrapper = ProcWrapper(args.cmd or args.argv, args.sample_rate, args.jobid)

if args.validate:
print(json.dumps(dataclasses.asdict(next(wrapper.poll_info()))))
print(json.dumps(asdict(next(wrapper.poll_info()))))
wrapper.proc.terminate()
elif args.dry_run:
[print(json.dumps(dataclasses.asdict(stat))) for stat in wrapper.poll_info()]
[pprint(asdict(stat)) for stat in wrapper.poll_info()]
else:
for trace in wrapper.poll_info():
resp = requests.post(args.endpoint, json=dataclasses.asdict(trace))
args.verbose and print(dataclasses.asdict(trace), resp)
resp = requests.post(args.endpoint, json=asdict(trace))
args.verbose and print(asdict(trace), resp)

0 comments on commit af870af

Please sign in to comment.