-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcron-withpds.py
executable file
·140 lines (114 loc) · 4.48 KB
/
cron-withpds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
"""
Cron script to submit qquery jobs.
"""
from __future__ import print_function
import json
import os
import sys
import requests
from datetime import datetime, timedelta
import argparse
import time
from hysds.celery import app
from hysds_commons.job_utils import submit_mozart_job
from shapely.geometry import Polygon, Point
def outlier(opds_polygon, bbox):
# checks if an point of the target polygon lies outside of the OPDS polygon
for lonlat in bbox:
lon = lonlat[0]
lat = lonlat[1]
point = Point(lon, lat)
if not opds_polygon.intersects(point):
return True
return False
def get_job_params(job_type, starttime, endtime, aoi_name, sling_extract_tag):
rule = {
"rule_name": job_type.lstrip('job-'),
"queue": "factotum-job_worker-asf_throttled",
"priority": 5,
"kwargs": '{}'
}
params = [
{
"name": "starttime",
"from": "value",
"value": starttime
},
{
"name": "endtime",
"from": "value",
"value": endtime
},
{
"name": "aoi_name",
"from": "value",
"value": aoi_name
},
{
"name": "asf_ngap_download_queue",
"from": "value",
"value": "opds-job_worker-small"
},
{
"name": "esa_download_queue",
"from": "value",
"value": "factotum-job_worker-scihub_throttled",
},
{
"name": "opds_sling_extract_version",
"from": "value",
"value": sling_extract_tag
}
]
return rule, params
def validate_temporal_input(starttime, endtime, hours_delta):
'''
:param starttime:
:param hours_delta:
:param days_delta:
:return:
'''
if isinstance(hours_delta, int):
raise Exception("Please make sure the delta specified is a number")
if starttime is None and hours_delta is not None:
return "{}Z".format((datetime.utcnow() - timedelta(hours=int(hours_delta))).isoformat()), "{}Z".format((datetime.utcnow().isoformat()))
elif starttime is not None and hours_delta is None:
if not endtime:
raise Exception("Please specify endtime!")
else:
return starttime, endtime
elif starttime is None and hours_delta is None:
raise Exception("None of the time parameters were specified. Must specify either start time, delta of hours")
else:
raise Exception("only one of the time parameters should be specified. "
"start time: {} delta of hours:{}"
.format(starttime, hours_delta))
if __name__ == "__main__":
'''
Main program that is run by cron to submit a query job
'''
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--start", help="start time for qquery of acuisitions within AOI in YYYY-MM-DDTHH:MM:SSZ", required=False, default=None)
parser.add_argument("--end", help="end time for qquery of acuisitions within AOI in YYYY-MM-DDTHH:MM:SSZ", required=False, default=None)
parser.add_argument("--hours", help="number of hoyrs before current time for qquery of acuisitions within AOI", required=False, default=None)
parser.add_argument("--aoi_name", help="AOI name for bounding box to qquery for acquisitions", required=True)
parser.add_argument("--tag", help="Release for Opendataset Acquisition Localizer - Qquery job",required=True)
parser.add_argument("--opds_sling_extract_version", help="release for sling-extract job", required=True)
args = parser.parse_args()
start, end = validate_temporal_input(args.start, args.end, args.hours)
job_type = "job-opendataset_acquisition_localizer_qquery"
job_spec = "{}:{}".format(job_type, args.tag)
rtime = datetime.utcnow()
job_name = "%s-%s-%s-%s" % (job_spec, start.replace("-", "").replace(":", ""),
end.replace("-", "").replace(":", ""),
rtime.strftime("%d_%b_%Y_%H:%M:%S"))
job_name = job_name[4:]
# Setup input arguments here
rule, params = get_job_params(job_type, start, end, args.aoi_name, args.opds_sling_extract_version)
print("submitting job of type {}".format(job_spec))
submit_mozart_job({}, rule,
hysdsio={"id": "internal-temporary-wiring",
"params": params,
"job-specification": job_spec},
job_name=job_name)