1
1
from wodpy import wod
2
- import pickle , sys , os , calendar , time
2
+ import pickle , sys , os , calendar , time , ast , getopt
3
3
import numpy as np
4
4
import util .main as main
5
5
from multiprocessing import Pool
@@ -12,21 +12,20 @@ def run(test, profiles, parameters):
12
12
verbose = []
13
13
exec ('from qctests import ' + test )
14
14
for profile in profiles :
15
- exec ('result = ' + test + '.test(profile, parameters)' )
16
- verbose .append (result )
15
+ exec ('verbose.append(' + test + '.test(profile, parameters))' )
17
16
18
17
return verbose
19
18
20
- def process_row (uid , logdir ):
19
+ def process_row (uid , logdir , table = 'iquod' , targetdb = 'iquod.db' ):
21
20
'''run all tests on the indicated database row'''
22
-
21
+
23
22
# reroute stdout, stderr to separate files for each profile to preserve logs
24
23
sys .stdout = open (logdir + "/" + str (uid ) + ".stdout" , "w" )
25
24
sys .stderr = open (logdir + "/" + str (uid ) + ".stderr" , "w" )
26
25
27
26
# extract profile
28
- profile = main .get_profile_from_db (uid )
29
-
27
+ profile = main .get_profile_from_db (uid , table , targetdb )
28
+
30
29
# mask out error codes in temperature data
31
30
main .catchFlags (profile )
32
31
@@ -35,70 +34,96 @@ def process_row(uid, logdir):
35
34
try :
36
35
result = run (test , [profile ], parameterStore )[0 ]
37
36
except :
38
- print test , 'exception' , sys .exc_info ()
37
+ print ( test , 'exception' , sys .exc_info () )
39
38
result = np .zeros (1 , dtype = bool )
40
39
41
40
try :
42
- query = "UPDATE " + sys . argv [ 1 ] + " SET " + test + "=? WHERE uid=" + str (profile .uid ()) + ";"
43
- main .dbinteract (query , [main .pack_array (result )])
41
+ query = "UPDATE " + table + " SET " + test + "=? WHERE uid=" + str (profile .uid ()) + ";"
42
+ main .dbinteract (query , [main .pack_array (result )], targetdb = targetdb )
44
43
except :
45
- print 'db exception' , sys .exc_info ()
44
+ print ( 'db exception' , sys .exc_info () )
46
45
47
46
48
47
########################################
49
48
# main
50
49
########################################
51
50
52
- if len (sys .argv )> 2 :
53
-
54
- # Identify and import tests
55
- testNames = main .importQC ('qctests' )
56
- testNames .sort ()
57
- print ('{} quality control checks have been found' .format (len (testNames )))
58
- testNames = main .checkQCTestRequirements (testNames )
59
- print ('{} quality control checks are able to be run:' .format (len (testNames )))
60
- for testName in testNames :
61
- print (' {}' .format (testName ))
62
-
63
- # set up a directory for logging
64
- logdir = "autoqc-logs-" + str (calendar .timegm (time .gmtime ()))
65
- os .makedirs (logdir )
66
-
67
- # Parallel processing.
68
- print ('\n Please wait while QC is performed\n ' )
69
-
70
- # set up global parmaeter store
71
- parameterStore = {
72
- "table" : sys .argv [1 ]
73
- }
74
- for test in testNames :
75
- exec ('from qctests import ' + test )
76
- try :
77
- exec (test + '.loadParameters(parameterStore)' )
78
- except :
79
- print 'No parameters to load for' , test
80
-
81
- # connect to database & fetch list of all uids
82
- query = 'SELECT uid FROM ' + sys .argv [1 ] + ' ORDER BY uid;'
83
- uids = main .dbinteract (query )
84
-
85
- # launch async processes
86
- if len (sys .argv ) > 4 :
87
- batchnumber = int (sys .argv [3 ])
88
- nperbatch = int (sys .argv [4 ])
89
- startindex = batchnumber * nperbatch
90
- endindex = min ((batchnumber + 1 )* nperbatch ,len (uids ))
91
- else :
92
- startindex = 0
93
- endindex = len (uids )
94
- pool = Pool (processes = int (sys .argv [2 ]))
95
- for i in range (startindex , endindex ):
96
- pool .apply_async (process_row , (uids [i ][0 ], logdir ))
97
- pool .close ()
98
- pool .join ()
99
-
51
+ # parse options
52
+ options , remainder = getopt .getopt (sys .argv [1 :], 't:d:b:n:p:l:h' )
53
+ cores = 1
54
+ targetdb = 'iquod.db'
55
+ dbtable = 'iquod'
56
+ logdir = '/AutoQClogs'
57
+ batchnumber = None
58
+ nperbatch = None
59
+ for opt , arg in options :
60
+ if opt == '-b' :
61
+ batchnumber = ast .literal_eval (arg )
62
+ if opt == '-d' :
63
+ dbtable = arg
64
+ if opt == '-l' :
65
+ logdir = arg
66
+ if opt == '-n' :
67
+ cores = ast .literal_eval (arg )
68
+ if opt == '-p' :
69
+ nperbatch = ast .literal_eval (arg )
70
+ if opt == '-t' :
71
+ targetdb = arg
72
+ if opt == '-h' :
73
+ print ('usage:' )
74
+ print ('-b <batch number to process>' )
75
+ print ('-d <db table name to create and write to>' )
76
+ print ('-l <directory to write logfiles to>' )
77
+ print ('-n <number of cores to use>' )
78
+ print ('-p <how many profiles to process per batch>' )
79
+ print ('-t <name of db file>' )
80
+ print ('-h print this help message and quit' )
81
+
82
+ # Identify and import tests
83
+ testNames = main .importQC ('qctests' )
84
+ testNames .sort ()
85
+ print ('{} quality control checks have been found' .format (len (testNames )))
86
+ testNames = main .checkQCTestRequirements (testNames )
87
+ print ('{} quality control checks are able to be run:' .format (len (testNames )))
88
+ for testName in testNames :
89
+ print (' {}' .format (testName ))
90
+
91
+ # set up a directory for logging
92
+ logdir = logdir + "/autoqc-logs-" + str (calendar .timegm (time .gmtime ()))
93
+ os .makedirs (logdir )
94
+
95
+ # Parallel processing.
96
+ print ('\n Please wait while QC is performed\n ' )
97
+
98
+ # set up global parmaeter store
99
+ parameterStore = {
100
+ "table" : dbtable ,
101
+ "db" : targetdb
102
+ }
103
+ for test in testNames :
104
+ exec ('from qctests import ' + test )
105
+ try :
106
+ exec (test + '.loadParameters(parameterStore)' )
107
+ except :
108
+ print ('No parameters to load for' , test )
109
+
110
+ # connect to database & fetch list of all uids
111
+ query = 'SELECT uid FROM ' + dbtable + ' ORDER BY uid;'
112
+ uids = main .dbinteract (query , targetdb = targetdb )
113
+
114
+ # launch async processes
115
+ if batchnumber is not None and nperbatch is not None :
116
+ batchnumber = int (batchnumber )
117
+ nperbatch = int (nperbatch )
118
+ startindex = batchnumber * nperbatch
119
+ endindex = min ((batchnumber + 1 )* nperbatch ,len (uids ))
100
120
else :
101
- print 'Please add command line arguments to name your output file and set parallelization:'
102
- print 'python AutoQC <database results table> <number of processes> <batch> <number of processes per batch> [<batchnumber> <number per batch>]'
103
- print 'will use <database results table> to log QC results in the database, and run the calculation parallelized over <number of processes>. By default all profiles will be processed, but optionally the processing can be done in batches of size <number per batch>.'
121
+ startindex = 0
122
+ endindex = len (uids )
123
+ pool = Pool (processes = int (cores ))
124
+ for i in range (startindex , endindex ):
125
+ pool .apply_async (process_row , (uids [i ][0 ], logdir , dbtable , targetdb ))
126
+ pool .close ()
127
+ pool .join ()
128
+
104
129
0 commit comments