Skip to content

Commit

Permalink
switch usage to hysds.es_util and refactored PGE's to reflect chhange…
Browse files Browse the repository at this point in the history
…s to ElasticsearchUtility (#19)
  • Loading branch information
DustinKLo authored May 6, 2020
1 parent 92e2e5b commit 9d7add4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 59 deletions.
38 changes: 7 additions & 31 deletions notify_by_email.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python
import os
import sys
import getpass
import requests
import json
import types
import base64
import socket

from smtplib import SMTP
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
Expand All @@ -16,8 +15,8 @@
from email import Encoders

from hysds.celery import app
from hysds.es_util import get_mozart_es, get_grq_es
from hysds_commons.net_utils import get_container_host_ip
from hysds_commons.elasticsearch_utils import ElasticsearchUtility


def read_context():
Expand Down Expand Up @@ -123,30 +122,6 @@ def send_email(sender, cc, bcc, subject, body, attachments=None):
smtp.quit()


def get_source(es_host, idx, _id):
"""Return source metadata for object_id."""
es = ElasticsearchUtility(es_host)
query = {
"sort": {
"_timestamp": {
"order": "desc"
}
},
"query": {
"term": {
"_id": _id
}
}
}
print('get_source debug %s from index: %s' % (_id, idx))
print('query: %s' % json.dumps(query, indent=2))
result = es.search(idx, query)
if result['hits']['total']['value'] == 0:
return None
else:
return result['hits']['hits'][0]['_source']


def get_cities(src):
"""Return list of cities."""

Expand Down Expand Up @@ -232,11 +207,11 @@ def get_facetview_link(link, _id, version=None):
component = context['component']

if component == "mozart" or component == "figaro":
es_url = app.conf["JOBS_ES_URL"]
es = get_mozart_es()
index = app.conf["STATUS_ALIAS"]
facetview_url = app.conf["MOZART_URL"]
else: # "tosca"
es_url = app.conf["GRQ_ES_URL"]
es = get_grq_es()
index = app.conf["DATASET_ALIAS"]
facetview_url = "https://aria-search-beta.jpl.nasa.gov/search" # TODO: why is it hard coded

Expand All @@ -246,8 +221,9 @@ def get_facetview_link(link, _id, version=None):
email_body = "Product with id %s was ingested." % object_id
email_attachments = None

doc = get_source(es_url, index, object_id)
if doc is not None:
doc = es.get_by_id(index=index, id=object_id, ignore=404)

if doc['found'] is True:
email_body += "\n\n%s" % get_metadata_snippet(doc, settings['SNIPPET_CFG'])
email_body += "\n\nThe entire metadata json for this product has been attached for your convenience.\n\n"
email_attachments = {
Expand Down
20 changes: 8 additions & 12 deletions purge.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/bin/env python
import json
import logging

import osaka.main
from hysds.celery import app
from hysds_commons.elasticsearch_utils import ElasticsearchUtility

from hysds.es_util import get_mozart_es, get_grq_es
from utils import revoke


Expand All @@ -30,15 +30,13 @@ def purge_products(query, component, operation):
logger.debug("query: %s" % json.dumps(query, indent=2))

if component == "mozart" or component == "figaro":
es_url = app.conf["JOBS_ES_URL"]
es = get_mozart_es()
es_index = app.conf["STATUS_ALIAS"]
else: # "tosca"
es_url = app.conf["GRQ_ES_URL"]
es = get_grq_es()
es_index = app.conf["DATASET_ALIAS"]

es = ElasticsearchUtility(es_url, logger=logger)

results = es.query(es_index, query) # Querying for products
results = es.query(index=es_index, body=query) # Querying for products

if component == 'tosca':
for result in results:
Expand All @@ -51,13 +49,11 @@ def purge_products(query, component, operation):
if not url.startswith("http"):
best = url

# making osaka call to delete product
print('paramater being passed to osaka.main.rmall: ', best)
print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product
if best is not None:
osaka.main.rmall(best)

# removing the metadata
es.delete_by_id(index, ident)
es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata
logger.info('Purged %s' % ident)

else:
Expand Down Expand Up @@ -91,7 +87,7 @@ def purge_products(query, component, operation):

# Both associated task and job from ES
logger.info('Removing document from index %s for %s', index, payload_id)
es.delete_by_id(index, payload_id)
es.delete_by_id(index=index, id=payload_id, ignore=404)
logger.info('Removed %s from index: %s', payload_id, index)
logger.info('Finished.')

Expand Down
23 changes: 7 additions & 16 deletions retry.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
#!/usr/bin/env python
import sys
import json
import time
import traceback
import backoff
from random import randint, uniform
from datetime import datetime
from celery import uuid

from hysds.celery import app
from hysds.es_util import get_mozart_es
from hysds.orchestrator import run_job
from hysds.log_utils import log_job_status
from hysds_commons.elasticsearch_utils import ElasticsearchUtility

from utils import revoke


JOBS_ES_URL = app.conf["JOBS_ES_URL"]
STATUS_ALIAS = app.conf["STATUS_ALIAS"]
es = ElasticsearchUtility(JOBS_ES_URL)
mozart_es = get_mozart_es()


def read_context():
Expand All @@ -27,10 +24,7 @@ def read_context():
return cxt


@backoff.on_exception(backoff.expo,
Exception,
max_tries=10,
max_value=64)
@backoff.on_exception(backoff.expo, Exception, max_tries=10, max_value=64)
def query_es(job_id):
query_json = {
"query": {
Expand All @@ -41,15 +35,12 @@ def query_es(job_id):
}
}
}
return es.search(STATUS_ALIAS, query_json)
return mozart_es.search(index=STATUS_ALIAS, body=query_json)


@backoff.on_exception(backoff.expo,
Exception,
max_tries=10,
max_value=64)
def delete_by_id(index, id):
es.delete_by_id(index, id)
@backoff.on_exception(backoff.expo, Exception, max_tries=10, max_value=64)
def delete_by_id(index, _id):
mozart_es.delete_by_id(index=index, id=_id)


def get_new_job_priority(old_priority, increment_by, new_priority):
Expand Down

0 comments on commit 9d7add4

Please sign in to comment.