forked from ROBelgium/MSNoise
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths02new_jobs.py
117 lines (98 loc) · 4.12 KB
/
s02new_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
""" This script searches the database for files flagged "N"ew or "M"odified.
For each date in the configured range, it checks if other stations are
available and defines the new jobs to be processed. Those are inserted in the
*jobs* table of the database.
To run it from the console:
.. code-block:: sh
$ python s02new_jobs.py
"""
from database_tools import *
import logging
import numpy as np
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
filename="./new_jobs.log",
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
logging.info('*** Starting: New Jobs ***')
db = connect()
if get_config(db, name="autocorr") in ['Y', 'y', '1', 1]:
AUTOCORR = True
else:
AUTOCORR = False
stations_to_analyse = [sta.sta for sta in get_stations(db, all=False)]
nfs = get_new_files(db)
days = {}
old_day = 0
old_pair = ""
day_pairs = []
jobs = []
i = 0
for nf in nfs:
# logging.debug('%s.%s will be MASTER for %s-%s'% (nf.net, nf.sta, nf.starttime, nf.endtime))
if nf.sta in stations_to_analyse:
day = "%s" % (nf.starttime.date())
if day != old_day:
day_pairs = np.unique(day_pairs)
for pair in day_pairs:
logging.debug('New Job for: %s - %s' % (day, pair))
# add_job(db, day, pair,type='CC',flag='T',commit=False)
jobs.append([day, pair, 'CC', 'T'])
day_pairs = []
old_day = day
available_stations = []
for station in get_data_availability(db, starttime=nf.starttime, endtime=nf.endtime):
if station.sta in stations_to_analyse:
if '%s.%s' % (station.net, station.sta) not in available_stations:
available_stations.append(
'%s.%s' % (station.net, station.sta))
stations = np.array([])
pairs = []
nS = '%s.%s' % (nf.net, nf.sta)
i = 0
for aS in available_stations:
if not AUTOCORR and nS == aS:
pass
else:
if i == 0:
pairs = np.array(':'.join(sorted([nS, aS])))
# stations = np.array([nS, aS])
i += 1
else:
pairs = np.vstack((pairs, ':'.join(sorted([nS, aS]))))
# stations = np.append(stations, np.array([nS,aS]))
pairs = np.unique(pairs)
for pair in pairs:
day_pairs.append(pair)
if day != old_day and day_pairs != []:
day_pairs = np.unique(day_pairs)
for pair in day_pairs:
logging.debug('New Job for: %s - %s' % (day, pair))
# add_job(db, day, pair,type='CC',flag='T')
jobs.append([day, pair, 'CC', 'T'])
count = len(jobs)
logging.debug("Found %i new jobs to do" % count)
alljobs = []
for job in jobs:
day, pair, type, flag = job
job = update_job(db, day, pair, type, flag, commit=False, returnjob=True)
alljobs.append(job)
if i % 100 == 0:
logging.debug("Committing 100 jobs")
db.add_all(alljobs)
db.commit()
alljobs = []
i += 1
if len(alljobs) != 0:
db.add_all(alljobs)
db.commit()
# update all _data_availability and mark files as "A"rchives
for sta in get_stations(db, all=True):
mark_data_availability(db, sta.net, sta.sta, flag='A')
logging.info('*** Finished: New Jobs ***')