forked from klendathu2k/slurp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathramenya.py
executable file
·159 lines (131 loc) · 7.05 KB
/
ramenya.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
import sh
from time import sleep
from contextlib import redirect_stdout
import sys
import argparse
kaedama = sh.Command("kaedama.py")
condor_q = sh.Command("condor_q")
psql = sh.Command("psql")
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument( '--runs', nargs='+', help="One argument for a specific run. Two arguments an inclusive range. Three or more, a list", default=[] )
arg_parser.add_argument( '--rules', nargs='+', default="['all']" )
arg_parser.add_argument( '--delay', help="Delay between loop executions",default=600)
arg_parser.add_argument( '--submit', help="Submit jobs to condor",default=True,action="store_true")
arg_parser.add_argument( '--no-submit', help="No submission, just print the summary information",action="store_false",dest="submit")
arg_parser.add_argument( '--outputs',help="Information printed at each loop",nargs='+', default=['started'] )
arg_parser.add_argument( '--once',help="Break out of the loop after one iteration",default=False,action="store_true")
arg_parser.add_argument( '--config',help="Specifies the configuration file used by kaedama to submit workflows", default='sphenix_auau23.yaml')
args = arg_parser.parse_args()
def makeRunCondition( runs ):
result = ""
if len(runs)==1 and int(runs[0])>0:
result = f" and run={runs[0]} ";
if len(runs)==2:
result = f" and run>={runs[0]} and run<={runs[1]}";
if len(runs)==3:
result = f" and runnumber in ({','.join(runs)})"
return result
def main():
while (True):
if args.submit:
if 'all' in args.rules or 'DST_EVENT' in args.rules:
print("Running the DST_EVENT rule")
if len(args.runs)==1:
kaedama( runs=args.runs[0], rule="DST_EVENT", config=args.config, batch=True, _out=sys.stdout )
elif len(args.runs)==2:
kaedama( "--runs", args.runs[0], args.runs[1], rule="DST_EVENT", config=args.config, batch=True, _out=sys.stdout )
elif len(args.runs)>2:
for r in args.runs:
kaedama( "--runs", r, rule="DST_EVENT", config=args.config, batch=True, _out=sys.stdout )
if 'all' in args.rules or 'DST_CALOR' in args.rules:
print("Running the DST_CALOR rule")
if len(args.runs)==1:
kaedama( runs=args.runs[0], rule="DST_CALOR", config=args.config, batch=True, _out=sys.stdout )
elif len(args.runs)==2:
kaedama( "--runs", args.runs[0], args.runs[1], rule="DST_CALOR", config=args.config, batch=True, _out=sys.stdout )
elif len(args.runs)>2:
for r in args.runs:
kaedama( "--runs", r, rule="DST_CALOR", config=args.config, batch=True, _out=sys.stdout )
# Build conditions for output queries
conditions = ""
conditions += makeRunCondition( args.runs )
if 'condorq' in args.outputs:
condor_q("-batch","sphnxpro",_out=sys.stdout)
if 'pending' in args.outputs:
#print("Summary of jobs which have not reached staus='started'")
#print("------------------------------------------------------")
psqlquery=f"""
select dsttype,prod_id,
count(run) as num_jobs ,
avg(age(submitted,submitting)) as avg_time_to_submit ,
min(age(submitted,submitting)) as min_time_to_submit ,
max(age(submitted,submitting)) as max_time_to_submit
from production_status
where status<='started' {conditions}
group by dsttype,prod_id
order by dsttype desc
;
"""
#psql(dbname="FileCatalog",command=psqlquery,_out=sys.stdout)
if 'started' in args.outputs:
#print("Summary of jobs which have reached staus='started'")
#print("--------------------------------------------------")
psqlquery=f"""
select dsttype,prod_id,
count(run) as num_jobs,
avg(age(started,submitting)) as avg_time_to_start,
count( case status when 'submitted' then 1 else null end )
as num_submitted,
count( case status when 'running' then 1 else null end )
as num_running,
count( case status when 'finished' then 1 else null end )
as num_finished,
count( case status when 'failed' then 1 else null end )
as num_failed,
avg(age(ended,started)) as avg_job_duration,
min(age(ended,started)) as min_job_duration,
max(age(ended,started)) as max_job_duration,
sum(nevents) as sum_events
from production_status
where status>='started' {conditions}
group by dsttype,prod_id
order by dsttype desc
;
"""
#psql(dbname="FileCatalog",command=psqlquery,_out=sys.stdout)
if 'clusters' in args.outputs:
#print("Summary of jobs which have reached staus='started'")
#print("--------------------------------------------------")
psqlquery=f"""
select dsttype,cluster,
count(run) as num_jobs,
avg(age(started,submitting)) as avg_time_to_start,
count( case status when 'submitted' then 1 else null end )
as num_submitted,
count( case status when 'running' then 1 else null end )
as num_running,
count( case status when 'finished' then 1 else null end )
as num_finished,
count( case status when 'failed' then 1 else null end )
as num_failed,
avg(age(ended,started)) as avg_job_duration,
min(age(ended,started)) as min_job_duration,
max(age(ended,started)) as max_job_duration,
sum(nevents) as sum_events
from production_status
where status>='started' {conditions}
group by dsttype,cluster
order by dsttype desc
;
"""
#psql(dbname="FileCatalog",command=psqlquery,_out=sys.stdout)
if 'everything' in args.outputs:
pass
#psql(dbname="FileCatalog",
# command="select dsttype,run,segment,cluster,process,status,nevents,started,running,ended,exit_code from production_status order by id;", _out=sys.stdout);
if args.once:
break
sleep(int(args.delay))
if __name__ == '__main__':
main()