From cab39c8b1d80149ab59437f5638cfbbc382471d3 Mon Sep 17 00:00:00 2001 From: Dustin Lo Date: Mon, 30 Mar 2020 17:51:55 -0700 Subject: [PATCH] Hc 158 refactor (#13) (#14) * Hc 158 refactor (#13) * refactored retry job added gitignore * purge.py refactor, TODO: not sure what this linne of code means query_obj["query"] * refactored purge job to use context insntead of positional and cleaned up code * refactored retry.py removed retry.sh, moved job_spec destinations to context cleaned up code * refactored notify by email job added more logging to purge job * deleting by index and id in retry.py, fixed bugs in retry.py refactored job_spec for revoke json to be context not positional * added setup.shh and hysds_commons in DockerFile * refactored reprioritize jobs PGE * using specfic branch for hysds_commons in setup.sh * fixed pip command in dockerfile * fixed pip command in dockerfile * fixed pip command in dockerfile * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed Dockerfile copy command * fixed command in job_spec from .sh to .py * fixed command job_spec in purge * fixed command job_spec in purge * fixed command job_spec in purge * fixed command job_spec in purge * edited job_spec commands for retry and purge * removed sudo from Dockerfile to maybe get retry job to work properly error: "ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.8' not found" * . ~/.bash_profiles to activate conda virtual environment * . ~/.bash_profiles to activate conda virtual environment * . ~/.bash_profiles to activate conda virtual environment * . ~/.bash_profiles to activate conda virtual environment * . ~/.bash_profiles to activate conda virtual environment * . ~/.bash_profiles to activate conda virtual environment * wrapped retry.py in a shell script instead * wrapped retry.py in a shell script instead * fixed command in job_specs * changed branch to develop-es7 in setup.sh * removed setup.sh --- .gitignore | 160 +++++++++++++ docker/Dockerfile | 13 +- docker/hysds-io.json.lw-mozart-reprioritize | 12 +- docker/hysds-io.json.lw-mozart-retry | 16 +- docker/hysds-io.json.lw-mozart-revoke | 10 +- docker/hysds-io.json.lw-tosca-purge | 10 +- .../job-spec.json.lw-mozart-notify-by-email | 10 +- docker/job-spec.json.lw-mozart-purge | 8 +- docker/job-spec.json.lw-mozart-reprioritize | 14 +- docker/job-spec.json.lw-mozart-retry | 14 +- docker/job-spec.json.lw-mozart-revoke | 21 +- docker/job-spec.json.lw-tosca-notify-by-email | 10 +- docker/job-spec.json.lw-tosca-purge | 8 +- notify_by_email.py | 212 ++++++++++-------- purge.py | 107 ++++----- purge.sh | 32 --- retry.py | 148 ++++++------ retry.sh | 9 +- 18 files changed, 488 insertions(+), 326 deletions(-) create mode 100644 .gitignore delete mode 100755 purge.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7289ff4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +*.pyc +build +*.egg-info + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +.idea +.idea/ + +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk diff --git a/docker/Dockerfile b/docker/Dockerfile index 57f3a20..a380891 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -5,10 +5,21 @@ LABEL description="Lightweight System Jobs" # provision lightweight-jobs PGE USER ops -COPY . /home/ops/verdi/ops/lightweight-jobs +COPY . /home/ops/lightweight-jobs + +RUN sudo mv /home/ops/verdi/ops/lightweight-jobs /home/ops/verdi/ops/lightweight-jobs.orig + +RUN unlink /home/ops/verdi/ops/hysds_commons +RUN sudo mv /home/ops/verdi/ops/hysds_commons-0.2.4 /home/ops/verdi/ops/hysds_commons-0.2.4.orig +RUN ln -s /home/ops/verdi/ops/hysds_commons /home/ops/verdi/ops/hysds_commons-0.2.4 + +RUN sudo mv /home/ops/lightweight-jobs/hysds_commons /home/ops/verdi/ops/ + +RUN ~/verdi/bin/pip install 'elasticsearch>=7.0.0,<8.0.0' # set entrypoint ENTRYPOINT ["/entrypoint-pge-with-stats.sh"] WORKDIR /home/ops + CMD ["/bin/bash", "--login"] diff --git a/docker/hysds-io.json.lw-mozart-reprioritize b/docker/hysds-io.json.lw-mozart-reprioritize index c465903..75ac31a 100644 --- a/docker/hysds-io.json.lw-mozart-reprioritize +++ b/docker/hysds-io.json.lw-mozart-reprioritize @@ -1,9 +1,9 @@ { - "label":"Reprioritize Jobs/Tasks", - "component":"figaro", - "submission_type":"individual", + "label": "Reprioritize Jobs/Tasks", + "component": "figaro", + "submission_type": "individual", "enable_dedup": false, - "params" : [ + "params": [ { "name": "retry_job_id", "type": "text", @@ -23,10 +23,10 @@ }, { "name": "new_job_priority", - "from": "submitter", "type": "text", + "from": "submitter", "default": "0", "lambda": "lambda x: int(x)" } ] -} +} \ No newline at end of file diff --git a/docker/hysds-io.json.lw-mozart-retry b/docker/hysds-io.json.lw-mozart-retry index b3ecbcc..5d624b0 100644 --- a/docker/hysds-io.json.lw-mozart-retry +++ b/docker/hysds-io.json.lw-mozart-retry @@ -1,9 +1,9 @@ { - "label":"Retry Jobs/Tasks", - "component":"figaro", - "submission_type":"individual", + "label": "Retry Jobs/Tasks", + "component": "figaro", + "submission_type": "individual", "enable_dedup": false, - "params" : [ + "params": [ { "name": "retry_job_id", "type": "text", @@ -25,9 +25,13 @@ "name": "job_priority_increment", "type": "enum", "from": "submitter", - "enumerables": ["-1","0","+1"], + "enumerables": [ + "-1", + "0", + "+1" + ], "default": "0", "lambda": "lambda x: int(x)" } ] -} +} \ No newline at end of file diff --git a/docker/hysds-io.json.lw-mozart-revoke b/docker/hysds-io.json.lw-mozart-revoke index 982335f..148261e 100644 --- a/docker/hysds-io.json.lw-mozart-revoke +++ b/docker/hysds-io.json.lw-mozart-revoke @@ -1,8 +1,8 @@ { - "label":"_Revoke / _Stop Job", - "component":"figaro", - "submission_type":"individual", - "params" : [ + "label": "_Revoke / _Stop Job", + "component": "figaro", + "submission_type": "individual", + "params": [ { "name": "query", "type": "text", @@ -21,4 +21,4 @@ "value": "revoke" } ] -} +} \ No newline at end of file diff --git a/docker/hysds-io.json.lw-tosca-purge b/docker/hysds-io.json.lw-tosca-purge index f614469..9082719 100644 --- a/docker/hysds-io.json.lw-tosca-purge +++ b/docker/hysds-io.json.lw-tosca-purge @@ -1,8 +1,8 @@ { - "label":"Purge datasets", - "component":"tosca", - "submission_type":"individual", - "params" : [ + "label": "Purge datasets", + "component": "tosca", + "submission_type": "individual", + "params": [ { "name": "query", "type": "text", @@ -21,4 +21,4 @@ "value": "purge" } ] -} +} \ No newline at end of file diff --git a/docker/job-spec.json.lw-mozart-notify-by-email b/docker/job-spec.json.lw-mozart-notify-by-email index 3ffd29a..2fbd24d 100644 --- a/docker/job-spec.json.lw-mozart-notify-by-email +++ b/docker/job-spec.json.lw-mozart-notify-by-email @@ -7,23 +7,23 @@ "params" : [ { "name": "id", - "destination": "positional" + "destination": "context" }, { "name": "url", - "destination": "positional" + "destination": "context" }, { "name": "emails", - "destination": "positional" + "destination": "context" }, { "name": "name", - "destination": "positional" + "destination": "context" }, { "name": "component", - "destination": "positional" + "destination": "context" } ] } diff --git a/docker/job-spec.json.lw-mozart-purge b/docker/job-spec.json.lw-mozart-purge index fb34ebb..03a02eb 100644 --- a/docker/job-spec.json.lw-mozart-purge +++ b/docker/job-spec.json.lw-mozart-purge @@ -1,21 +1,21 @@ { "required_queues":["system-jobs-queue"], - "command":"/home/ops/verdi/ops/lightweight-jobs/purge.sh", + "command":"python /home/ops/lightweight-jobs/purge.py", "disk_usage":"3GB", "soft_time_limit": 86400, "time_limit": 86700, "params" : [ { "name": "query", - "destination": "positional" + "destination": "context" }, { "name": "component", - "destination": "positional" + "destination": "context" }, { "name": "operation", - "destination": "positional" + "destination": "context" } ] } diff --git a/docker/job-spec.json.lw-mozart-reprioritize b/docker/job-spec.json.lw-mozart-reprioritize index fa84603..d6ce145 100644 --- a/docker/job-spec.json.lw-mozart-reprioritize +++ b/docker/job-spec.json.lw-mozart-reprioritize @@ -1,17 +1,19 @@ { - "required_queues":["system-jobs-queue"], - "command":"/home/ops/verdi/ops/lightweight-jobs/retry.sh", - "disk_usage":"3GB", + "required_queues": [ + "system-jobs-queue" + ], + "command": "/home/ops/lightweight-jobs/retry.sh", + "disk_usage": "3GB", "soft_time_limit": 86400, "time_limit": 86700, - "params" : [ + "params": [ { "name": "retry_job_id", "destination": "context" }, { "name": "type", - "destination": "positional" + "destination": "context" }, { "name": "retry_count_max", @@ -22,4 +24,4 @@ "destination": "context" } ] -} +} \ No newline at end of file diff --git a/docker/job-spec.json.lw-mozart-retry b/docker/job-spec.json.lw-mozart-retry index 3c1be25..b2af219 100644 --- a/docker/job-spec.json.lw-mozart-retry +++ b/docker/job-spec.json.lw-mozart-retry @@ -1,17 +1,19 @@ { - "required_queues":["system-jobs-queue"], - "command":"/home/ops/verdi/ops/lightweight-jobs/retry.sh", - "disk_usage":"3GB", + "required_queues": [ + "system-jobs-queue" + ], + "command": "/home/ops/lightweight-jobs/retry.sh", + "disk_usage": "3GB", "soft_time_limit": 86400, "time_limit": 86700, - "params" : [ + "params": [ { "name": "retry_job_id", "destination": "context" }, { "name": "type", - "destination": "positional" + "destination": "context" }, { "name": "retry_count_max", @@ -22,4 +24,4 @@ "destination": "context" } ] -} +} \ No newline at end of file diff --git a/docker/job-spec.json.lw-mozart-revoke b/docker/job-spec.json.lw-mozart-revoke index 2a76b8b..d7d73ff 100644 --- a/docker/job-spec.json.lw-mozart-revoke +++ b/docker/job-spec.json.lw-mozart-revoke @@ -1,22 +1,23 @@ { - "required_queues":["system-jobs-queue"], - "command":"/home/ops/verdi/ops/lightweight-jobs/purge.sh", - "disk_usage":"3GB", + "required_queues": [ + "system-jobs-queue" + ], + "command": "python /home/ops/lightweight-jobs/purge.py", + "disk_usage": "3GB", "soft_time_limit": 86400, "time_limit": 86700, - "params" : [ + "params": [ { "name": "query", - "destination": "positional" + "destination": "context" }, { "name": "component", - "destination": "positional" + "destination": "context" }, { "name": "operation", - "destination": "positional" + "destination": "context" } - ] -} - + ] +} \ No newline at end of file diff --git a/docker/job-spec.json.lw-tosca-notify-by-email b/docker/job-spec.json.lw-tosca-notify-by-email index 3ffd29a..2fbd24d 100644 --- a/docker/job-spec.json.lw-tosca-notify-by-email +++ b/docker/job-spec.json.lw-tosca-notify-by-email @@ -7,23 +7,23 @@ "params" : [ { "name": "id", - "destination": "positional" + "destination": "context" }, { "name": "url", - "destination": "positional" + "destination": "context" }, { "name": "emails", - "destination": "positional" + "destination": "context" }, { "name": "name", - "destination": "positional" + "destination": "context" }, { "name": "component", - "destination": "positional" + "destination": "context" } ] } diff --git a/docker/job-spec.json.lw-tosca-purge b/docker/job-spec.json.lw-tosca-purge index 540c2ff..255e17a 100644 --- a/docker/job-spec.json.lw-tosca-purge +++ b/docker/job-spec.json.lw-tosca-purge @@ -1,6 +1,6 @@ { "required_queues":["system-jobs-queue"], - "command":"/home/ops/verdi/ops/lightweight-jobs/purge.sh", + "command":"python /home/ops/lightweight-jobs/purge.py", "imported_worker_files":{ "$HOME/.aws":"/home/ops/.aws", "$HOME/.azure": "/home/ops/.azure" @@ -11,15 +11,15 @@ "params" : [ { "name": "query", - "destination": "positional" + "destination": "context" }, { "name": "component", - "destination": "positional" + "destination": "context" }, { "name": "operation", - "destination": "positional" + "destination": "context" } ] } diff --git a/notify_by_email.py b/notify_by_email.py index ff34cf0..0c97d4e 100644 --- a/notify_by_email.py +++ b/notify_by_email.py @@ -14,27 +14,36 @@ from email.Header import Header from email.Utils import parseaddr, formataddr, COMMASPACE, formatdate from email import Encoders + from hysds.celery import app from hysds_commons.net_utils import get_container_host_ip +from hysds_commons.elasticsearch_utils import ElasticsearchUtility + + +def read_context(): + with open('_context.json', 'r') as f: + cxt = json.load(f) + return cxt def get_hostname(): """Get hostname.""" - - # get hostname try: return socket.getfqdn() - except: - # get IP - try: - return socket.gethostbyname(socket.gethostname()) - except: - raise RuntimeError( - "Failed to resolve hostname for full email address. Check system.") + except Exception as e: + print(e) + print('socket.getfqdn() failed, passing...') + pass + try: + return socket.gethostbyname(socket.gethostname()) + except Exception as e: + print(e) + raise RuntimeError("Failed to resolve hostname for full email address. Check system.") -def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments=None): - """Send an email. +def send_email(sender, cc, bcc, subject, body, attachments=None): + """ + Send an email. All arguments should be Unicode strings (plain ASCII works as well). @@ -48,8 +57,7 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments and UTF-8 that can represent all the characters occurring in the email. """ - # combined recipients - recipients = cc_recipients + bcc_recipients + recipients = cc + bcc # combined recipients # Header class is smart enough to try US-ASCII, then the charset we # provide, then fall back to UTF-8. @@ -66,35 +74,33 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments # Split real name (which is optional) and email address parts sender_name, sender_addr = parseaddr(sender) - parsed_cc_recipients = [parseaddr(rec) for rec in cc_recipients] - parsed_bcc_recipients = [parseaddr(rec) for rec in bcc_recipients] - #recipient_name, recipient_addr = parseaddr(recipient) + parsed_cc = [parseaddr(rec) for rec in cc] + parsed_bcc = [parseaddr(rec) for rec in bcc] # We must always pass Unicode strings to Header, otherwise it will # use RFC 2047 encoding even on plain ASCII strings. - sender_name = str(Header(str(sender_name), header_charset)) - unicode_parsed_cc_recipients = [] - for recipient_name, recipient_addr in parsed_cc_recipients: + unicode_parsed_cc = [] + for recipient_name, recipient_addr in parsed_cc: recipient_name = str(Header(str(recipient_name), header_charset)) + # Make sure email addresses do not contain non-ASCII characters recipient_addr = recipient_addr.encode('ascii') - unicode_parsed_cc_recipients.append((recipient_name, recipient_addr)) - unicode_parsed_bcc_recipients = [] - for recipient_name, recipient_addr in parsed_bcc_recipients: + unicode_parsed_cc.append((recipient_name, recipient_addr)) + + unicode_parsed_bcc = [] + for recipient_name, recipient_addr in parsed_bcc: recipient_name = str(Header(str(recipient_name), header_charset)) + # Make sure email addresses do not contain non-ASCII characters recipient_addr = recipient_addr.encode('ascii') - unicode_parsed_bcc_recipients.append((recipient_name, recipient_addr)) - - # Make sure email addresses do not contain non-ASCII characters - sender_addr = sender_addr.encode('ascii') + unicode_parsed_bcc.append((recipient_name, recipient_addr)) # Create the message ('plain' stands for Content-Type: text/plain) msg = MIMEMultipart() msg['CC'] = COMMASPACE.join([formataddr((recipient_name, recipient_addr)) - for recipient_name, recipient_addr in unicode_parsed_cc_recipients]) + for recipient_name, recipient_addr in unicode_parsed_cc]) msg['BCC'] = COMMASPACE.join([formataddr((recipient_name, recipient_addr)) - for recipient_name, recipient_addr in unicode_parsed_bcc_recipients]) + for recipient_name, recipient_addr in unicode_parsed_bcc]) msg['Subject'] = Header(str(subject), header_charset) msg['FROM'] = "no-reply@jpl.nasa.gov" msg.attach(MIMEText(body.encode(body_charset), 'plain', body_charset)) @@ -109,9 +115,6 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments 'attachment; filename="%s"' % fname) msg.attach(part) - # print "#" * 80 - # print msg.as_string() - # Send the message via SMTP to docker host smtp_url = "smtp://%s:25" % get_container_host_ip() print("smtp_url : %s", smtp_url) @@ -120,9 +123,9 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments smtp.quit() -def get_source(es_url, query_idx, objectid): - """Return source metadata for objectid.""" - +def get_source(es_host, idx, _id): + """Return source metadata for object_id.""" + es = ElasticsearchUtility(es_host) query = { "sort": { "_timestamp": { @@ -131,17 +134,14 @@ def get_source(es_url, query_idx, objectid): }, "query": { "term": { - "_id": objectid + "_id": _id } } } - print('get_source debug:', '%s/%s/_search', es_url, - " ", query_idx, ' ', json.dumps(query)) - r = requests.post('%s/%s/_search' % - (es_url, query_idx), data=json.dumps(query)) - r.raise_for_status() - result = r.json() - if result['hits']['total'] == 0: + print('get_source debug %s from index: %s' % (_id, idx)) + print('query: %s' % json.dumps(query, indent=2)) + result = es.search(idx, query) + if result['hits']['total']['value'] == 0: return None else: return result['hits']['hits'][0]['_source'] @@ -180,84 +180,104 @@ def get_metadata_snippet(src, snippet_cfg): if val is not None: body += "%s: %s\n" % (label, val) body += "location type: %s\n" % src.get('location', {}).get('type', None) - body += "location coordinates: %s\n" % src.get( - 'location', {}).get('coordinates', []) + body += "location coordinates: %s\n" % src.get('location', {}).get('coordinates', []) cities = get_cities(src) - body += "Closest cities: %s" % "\n ".join(cities) + body += "Closest cities: %s" % "\n\t\t".join(cities) return body -def get_facetview_link(facetview_url, objectid, system_version=None): - """Return link to objectid in FacetView interface.""" - - if system_version is None: - b64 = base64.urlsafe_b64encode( - '{"query":{"query_string":{"query":"_id:%s"}}}' % objectid) +def get_facetview_link(link, _id, version=None): + """ + Return link to object_id in FacetView interface. + :param link: str + :param _id: str, _id for elasticsearch document + :param version: str + :return: constructed URL for facetview + """ + if version is None: + query = { + 'query': { + 'query_string': { + 'query': '_id:%s' % _id + } + } + } + b64 = base64.urlsafe_b64encode(json.dumps(query)) else: - b64 = base64.urlsafe_b64encode( - '{"query":{"query_string":{"query":"_id:%s AND system_version:%s"}}}' % (objectid, system_version)) - if facetview_url.endswith('/'): - facetview_url = facetview_url[:-1] - return '%s/?base64=%s' % (facetview_url, b64) + query = { + 'query': { + 'query_string': { + 'query': '_id:%s AND system_versions:%s' % (_id, version) + } + } + } + b64 = base64.urlsafe_b64encode(json.dumps(query)) + if link.endswith('/'): + link = link[:-1] + return '%s/?base64=%s' % (link, b64) if __name__ == "__main__": - settings_file = os.path.normpath( - os.path.join( - os.path.dirname(os.path.realpath(__file__)), - 'settings.json') - ) + cwd = os.getcwd() + settings_file = os.path.join(cwd, 'settings.json') + settings_file = os.path.normpath(settings_file) # normalizing the path settings = json.load(open(settings_file)) - objectid = sys.argv[1] - url = sys.argv[2] - emails = sys.argv[3] - rule_name = sys.argv[4] - component = sys.argv[5] + context = read_context() + + object_id = context['objectid'] + url = context['url'] + emails = context['emails'] + rule_name = context['rule_name'] + component = context['component'] if component == "mozart" or component == "figaro": es_url = app.conf["JOBS_ES_URL"] - query_idx = app.conf["STATUS_ALIAS"] + index = app.conf["STATUS_ALIAS"] facetview_url = app.conf["MOZART_URL"] - elif component == "tosca": + else: # "tosca" es_url = app.conf["GRQ_ES_URL"] - query_idx = app.conf["DATASET_ALIAS"] - #facetview_url = app.conf["TOSCA_URL"] - # updating facetview_url with updated aria-search-beta hostname - facetview_url = "https://aria-search-beta.jpl.nasa.gov/search" + index = app.conf["DATASET_ALIAS"] + facetview_url = "https://aria-search-beta.jpl.nasa.gov/search" # TODO: why is it hard coded cc_recipients = [i.strip() for i in emails.split(',')] bcc_recipients = [] - subject = "[monitor] (notify_by_email:%s) %s" % (rule_name, objectid) - body = "Product with id %s was ingested." % objectid - attachments = None - src = get_source(es_url, query_idx, objectid) - if src is not None: - # attach metadata json - body += "\n\n%s" % get_metadata_snippet(src, settings['SNIPPET_CFG']) - body += "\n\nThe entire metadata json for this product has been attached for your convenience.\n\n" - attachments = {'metadata.json': json.dumps(src, indent=2)} + email_subject = "[monitor] (notify_by_email:%s) %s" % (rule_name, object_id) + email_body = "Product with id %s was ingested." % object_id + email_attachments = None + + doc = get_source(es_url, index, object_id) + if doc is not None: + email_body += "\n\n%s" % get_metadata_snippet(doc, settings['SNIPPET_CFG']) + email_body += "\n\nThe entire metadata json for this product has been attached for your convenience.\n\n" + email_attachments = { + 'metadata.json': json.dumps(doc, indent=2) # attach metadata json + } # attach browse images - if len(src['browse_urls']) > 0: - browse_url = src['browse_urls'][0] - if len(src['images']) > 0: - body += "Browse images have been attached as well.\n\n" - for i in src['images']: + if len(doc['browse_urls']) > 0: + browse_url = doc['browse_urls'][0] + if len(doc['images']) > 0: + email_body += "Browse images have been attached as well.\n\n" + for i in doc['images']: small_img = i['small_img'] small_img_url = os.path.join(browse_url, small_img) r = requests.get(small_img_url) if r.status_code != 200: continue - attachments[small_img] = r.content + email_attachments[small_img] = r.content else: - body += "\n\n" - body += "You may access the product here:\n\n%s" % url - facet_url = get_facetview_link( - facetview_url, objectid, None if src is None else src.get('system_version', None)) + email_body += "\n\n" + + email_body += "You may access the product here:\n\n%s" % url + + system_version = None if doc is None else doc.get('system_version') + facet_url = get_facetview_link(facetview_url, object_id, system_version) + if facet_url is not None: - body += "\n\nYou may view this product in FacetView here:\n\n%s" % facet_url - body += "\n\nNOTE: You may have to cut and paste the FacetView link into your " - body += "browser's address bar to prevent your email client from escaping the curly brackets." - send_email("%s@%s" % (getpass.getuser(), get_hostname()), cc_recipients, - bcc_recipients, subject, body, attachments=attachments) + email_body += "\n\nYou may view this product in FacetView here:\n\n%s" % facet_url + email_body += "\n\nNOTE: You may have to cut and paste the FacetView link into your " + email_body += "browser's address bar to prevent your email client from escaping the curly brackets." + + username_email = "%s@%s" % (getpass.getuser(), get_hostname()) + send_email(username_email, bcc_recipients, email_subject, email_body, attachments=email_attachments) diff --git a/purge.py b/purge.py index 2b0b36a..51ef89d 100644 --- a/purge.py +++ b/purge.py @@ -1,54 +1,51 @@ #!/bin/env python import json import logging -import sys -import hysds_commons.request_utils -import hysds_commons.metadata_rest_utils import osaka.main from hysds.celery import app +from hysds_commons.elasticsearch_utils import ElasticsearchUtility -# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger("hysds") +LOG_FILE_NAME = 'purge.log' +logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG) +logger = logging + + +def read_context(): + with open('_context.json', 'r') as f: + cxt = json.load(f) + return cxt def purge_products(query, component, operation): - ''' + """ Iterator used to iterate across a query result and submit jobs for every hit - @param es_url - ElasticSearch URL to hit with query - @param es_index - Index in ElasticSearch to hit with query (usually an alias) - @param username - name of the user owning this submission - @param query - query to post to ElasticSearch and whose result will be iterated, JSON sting enc - @param kwargs - key-word args to match to HySDS IO - ''' - logger.debug("Doing %s for %s with query: %s", operation, component, query) + :param query: query to post to ElasticSearch and whose result will be iterated, JSON sting enc + :param component: tosca || figaro + :param operation: purge or something else + """ + logger.debug("action: %s for %s", operation, component) + logger.debug("query: %s" % json.dumps(query, indent=2)) if component == "mozart" or component == "figaro": es_url = app.conf["JOBS_ES_URL"] es_index = app.conf["STATUS_ALIAS"] - facetview_url = app.conf["MOZART_URL"] - elif component == "tosca": + else: # "tosca" es_url = app.conf["GRQ_ES_URL"] es_index = app.conf["DATASET_ALIAS"] - facetview_url = app.conf["GRQ_URL"] - # Querying for products - start_url = "{0}/{1}/_search".format(es_url, es_index) - scroll_url = "{0}/_search".format(es_url, es_index) + es = ElasticsearchUtility(es_url, logger=logger) - results = hysds_commons.request_utils.post_scrolled_json_responses( - start_url, scroll_url, generator=True, data=json.dumps(query), logger=logger) - # print results + results = es.query(es_index, query) # Querying for products if component == 'tosca': for result in results: - es_type = result["_type"] ident = result["_id"] index = result["_index"] + # find the Best URL first best = None for url in result["_source"]["urls"]: - if best is None or not url.startswith("http"): + if not url.startswith("http"): best = url # making osaka call to delete product @@ -57,26 +54,22 @@ def purge_products(query, component, operation): osaka.main.rmall(best) # removing the metadata - hysds_commons.metadata_rest_utils.remove_metadata( - es_url, index, es_type, ident, logger) + es.delete_by_id(index, ident) + logger.info('Purged %s' % ident) else: - if operation == 'purge': - purge = True - else: - purge = False - # purge job from index + purge = True if operation == 'purge' else False # purge job from index for result in results: uuid = result["_source"]['uuid'] payload_id = result["_source"]['payload_id'] index = result["_index"] - es_type = result['_type'] + # Always grab latest state (not state from query result) task = app.AsyncResult(uuid) - state = task.state - # Active states may only revoke - logger.info("Job state: %s\n", state) + state = task.state # Active states may only revoke + logger.info("\nJob state: %s\n", state) + if state in ["RETRY", "STARTED"] or (state == "PENDING" and not purge): if not purge: logger.info('Revoking %s\n', uuid) @@ -87,36 +80,30 @@ def purge_products(query, component, operation): elif not purge: logger.info('Cannot stop inactive job: %s\n', uuid) continue - # Saftey net to revoke job if in PENDING state + + # Safety net to revoke job if in PENDING state if state == "PENDING": logger.info('Revoking %s\n', uuid) app.control.revoke(uuid, terminate=True) # Both associated task and job from ES - logger.info('Removing ES for %s:%s', es_type, payload_id) - r = hysds_commons.metadata_rest_utils.remove_metadata( - es_url, index, es_type, payload_id, logger) - # r.raise_for_status() #not req - # res = r.json() #not req - logger.info('done.\n') - logger.info('Finished\n') + logger.info('Removing document from index %s for %s', index, payload_id) + es.delete_by_id(index, payload_id) + logger.info('Removed %s from index: %s', payload_id, index) + logger.info('Finished.') if __name__ == "__main__": - ''' - Main program of purge_products - ''' - # encoding to a JSON object - #decoded_string = sys.argv[1].decode('string_escape') - #dec = decoded_string.replace('u""','"') - #decoded_inp = dec.replace('""','"') - decoded_inp = sys.argv[1] - print(decoded_inp) - if decoded_inp.startswith('{"query"') or decoded_inp.startswith("{u'query'") or decoded_inp.startswith("{'query'"): - query_obj = json.loads(decoded_inp) - else: - query_obj["query"] = json.loads(decoded_inp) + """Main program of purge_products""" + context = read_context() # reading the context file + + component_val = context['component'] + operation_val = context['operation'] + + query_obj = context['query'] + try: + query_obj = json.loads(query_obj) + except TypeError as e: + logger.warning(e) - component = sys.argv[2] - operation = sys.argv[3] - purge_products(query_obj, component, operation) + purge_products(query_obj, component_val, operation_val) diff --git a/purge.sh b/purge.sh deleted file mode 100755 index 61c31a9..0000000 --- a/purge.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -source ~/.bash_profile - -BASE_PATH=$(dirname "${BASH_SOURCE}") - -# check args -if [ "$#" -eq 3 ]; then - query=$1 - component=$2 - operation=$3 -else - echo "Invalid number or arguments ($#) $*" 1>&2 - exit 1 -fi - -# purge products -echo "##########################################" 1>&2 -echo -n "Purge/Stop/Revoke products: " 1>&2 -date 1>&2 -python $BASE_PATH/purge.py "$query" "$component" "$operation" > purge.log 2>&1 -STATUS=$? -echo -n "Finished purging/revoking: " 1>&2 -date 1>&2 -if [ $STATUS -ne 0 ]; then - echo "Failed to purge/revoke." 1>&2 - cat purge.log 1>&2 - echo "{}" - exit $STATUS -fi - -exit 0 diff --git a/retry.py b/retry.py index 9ee8d05..2907190 100644 --- a/retry.py +++ b/retry.py @@ -1,27 +1,42 @@ #!/usr/bin/env python import sys import json -import requests import time import traceback from random import randint, uniform from datetime import datetime from celery import uuid + from hysds.celery import app from hysds.orchestrator import run_job from hysds.log_utils import log_job_status +from hysds_commons.elasticsearch_utils import ElasticsearchUtility + +JOBS_ES_URL = app.conf["JOBS_ES_URL"] +STATUS_ALIAS = app.conf["STATUS_ALIAS"] +es = ElasticsearchUtility(JOBS_ES_URL) + + +def read_context(): + with open('_context.json', 'r') as f: + cxt = json.load(f) + return cxt -def query_ES(job_id): - # get the ES_URL - es_url = app.conf["JOBS_ES_URL"] - index = app.conf["STATUS_ALIAS"] +def query_es(job_id): query_json = { - "query": {"bool": {"must": [{"term": {"job.job_info.id": "job_id"}}]}}} - query_json["query"]["bool"]["must"][0]["term"]["job.job_info.id"] = job_id - r = requests.post('%s/%s/_search?' % - (es_url, index), json.dumps(query_json)) - return r + "query": { + "bool": { + "must": [ + {"term": {"job.job_info.id": job_id}} + ] + } + } + } + doc = es.search(STATUS_ALIAS, query_json) + if doc['hits']['total']['value'] == 0: + raise LookupError('job id %s not found in Elasticsearch' % job_id) + return doc def rand_sleep(sleep_min=0.1, sleep_max=1): time.sleep( @@ -32,47 +47,50 @@ def get_new_job_priority(old_priority, increment_by, new_priority): if increment_by is not None: priority = int(old_priority) + int(increment_by) if priority == 0 or priority == 9: - print(("Not applying {} on previous priority of {} as it needs to be in range 0 to 8".format(increment_by, - old_priority))) + print("Not applying {} on previous priority of {}") + print("Priority must be between 0 and 8".format(increment_by, old_priority)) priority = int(old_priority) else: priority = int(new_priority) return priority -def resubmit_jobs(): - es_url = app.conf["JOBS_ES_URL"] +def resubmit_jobs(context): + """ + logic to resubmit the job + :param context: contents from _context.json + """ # random sleep to prevent from getting ElasticSearch errors: # 429 Client Error: Too Many Requests time.sleep(randint(1, 5)) - # can call submit_job # iterate through job ids and query to get the job json - with open('_context.json') as f: - ctx = json.load(f) - increment_by = None new_priority = None - if "job_priority_increment" in ctx: - increment_by = ctx["job_priority_increment"] + if "job_priority_increment" in context: + increment_by = context["job_priority_increment"] + else: + new_priority = context["new_job_priority"] + + retry_count_max = context['retry_count_max'] + + if isinstance(context['retry_job_id'], list): + retry_job_ids = context['retry_job_id'] else: - new_priority = ctx["new_job_priority"] + retry_job_ids = [context['retry_job_id']] - retry_count_max = ctx['retry_count_max'] - retry_job_ids = ctx['retry_job_id'] if isinstance( - ctx['retry_job_id'], list) else [ctx['retry_job_id']] for job_id in retry_job_ids: - print(("Retrying job: {}".format(job_id))) + print(("Validating retry job: {}".format(job_id))) try: # get job json for ES rand_sleep() - response = query_ES(job_id) - if response.status_code != 200: - print(("Failed to query ES. Got status code %d:\n%s" % (response.status_code, json.dumps(response.json(), - indent=2)))) - response.raise_for_status() - resp_json = response.json() - job_json = resp_json["hits"]["hits"][0]["_source"]["job"] + + doc = query_es(job_id) + doc = doc["hits"]["hits"][0] + + job_json = doc["_source"]["job"] + index = doc["_index"] + _id = doc["_id"] # don't retry a retry if job_json['type'].startswith('job-lw-mozart-retry'): @@ -90,16 +108,15 @@ def resubmit_jobs(): else: job_json['retry_count'] = 1 job_json["job_info"]["dedup"] = False + # clean up job execution info - for i in ('duration', 'execute_node', 'facts', 'job_dir', 'job_url', - 'metrics', 'pid', 'public_ip', 'status', 'stderr', - 'stdout', 'time_end', 'time_queued', 'time_start'): + for i in ('duration', 'execute_node', 'facts', 'job_dir', 'job_url', 'metrics', 'pid', 'public_ip', + 'status', 'stderr', 'stdout', 'time_end', 'time_queued', 'time_start'): if i in job_json.get('job_info', {}): del job_json['job_info'][i] # set queue time - job_json['job_info']['time_queued'] = datetime.utcnow( - ).isoformat() + 'Z' + job_json['job_info']['time_queued'] = datetime.utcnow().isoformat() + 'Z' # reset priority old_priority = job_json['priority'] @@ -108,54 +125,51 @@ def resubmit_jobs(): # revoke original job rand_sleep() + + job_id = job_json['job_id'] try: - app.control.revoke(job_json['job_id'], terminate=True) - print("revoked original job: %s" % job_json['job_id']) - except Exception as e: - print("Got error issuing revoke on job %s: %s" % - (job_json['job_id'], traceback.format_exc())) + app.control.revoke(job_id, terminate=True) + print("revoked original job: %s" % job_id) + except: + print("Got error issuing revoke on job %s: %s" % (job_id, traceback.format_exc())) print("Continuing.") # generate celery task id - job_json['task_id'] = uuid() + task_id = uuid() + job_json['task_id'] = task_id # delete old job status rand_sleep() - try: - r = requests.delete("%s/%s/job/_query?q=_id:%s" % - (es_url, query_idx, job_json['job_id'])) - r.raise_for_status() - print("deleted original job status: %s" % job_json['job_id']) - except Exception as e: - print("Got error deleting job status %s: %s" % - (job_json['job_id'], traceback.format_exc())) - print("Continuing.") + es.delete_by_id(index, _id) # log queued status rand_sleep() - job_status_json = {'uuid': job_json['task_id'], - 'job_id': job_json['job_id'], - 'payload_id': job_json['job_info']['job_payload']['payload_task_id'], - 'status': 'job-queued', - 'job': job_json} + job_status_json = { + 'uuid': task_id, + 'job_id': job_id, + 'payload_id': job_json['job_info']['job_payload']['payload_task_id'], + 'status': 'job-queued', + 'job': job_json + } log_job_status(job_status_json) # submit job queue = job_json['job_info']['job_queue'] - res = run_job.apply_async((job_json,), queue=queue, - time_limit=job_json['job_info']['time_limit'], - soft_time_limit=job_json['job_info']['soft_time_limit'], - priority=job_json['priority'], - task_id=job_json['task_id']) + run_job.apply_async((job_json,), queue=queue, + time_limit=job_json['job_info']['time_limit'], + soft_time_limit=job_json['job_info']['soft_time_limit'], + priority=job_json['priority'], + task_id=task_id) except Exception as ex: - print("[ERROR] Exception occured {0}:{1} {2}".format( - type(ex), ex, traceback.format_exc()), file=sys.stderr) + print("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc()), + file=sys.stderr) if __name__ == "__main__": - query_idx = app.conf['STATUS_ALIAS'] - input_type = sys.argv[1] + ctx = read_context() + + input_type = ctx['type'] if input_type != "worker": - resubmit_jobs() + resubmit_jobs(ctx) else: print("Cannot retry a worker.") diff --git a/retry.sh b/retry.sh index a45c051..bf478cb 100755 --- a/retry.sh +++ b/retry.sh @@ -4,18 +4,11 @@ source ~/.bash_profile BASE_PATH=$(dirname "${BASH_SOURCE}") -if [ "$#" -eq 1 ]; then - types=$1 -else - echo "Invalid number or arguments ($#) $*" 1>&2 - exit 1 -fi - # retry job echo "##########################################" 1>&2 echo -n "Retrying job: " 1>&2 date 1>&2 -python $BASE_PATH/retry.py "$types" > retry.log 2>&1 +python $BASE_PATH/retry.py > retry.log 2>&1 STATUS=$? echo -n "Finished retrying job: " 1>&2