Skip to content

Commit

Permalink
HC-253: Update legacy ElasticSearch Code in Lightweight Jobs to confo…
Browse files Browse the repository at this point in the history
…rm to ES 7.1 (#21)

* HC-253: Updated the get functions to ES 7.1 in ligthweight jobs

The aws_get and wget functions have been updated to be compliant with ES 7.1.
In addition, they now use the ElasticSearch utilities from HySDS commons instead of the requests library.

* open the tar file as binary

* fixed an issue where the first set of results don't get added to the script

* fixed issue where the first set of results aren't getting captured.

* read as binary

* properly get the count

* clear out scroll_ids after use

Co-authored-by: Mike Cayanan <[email protected]>
  • Loading branch information
mcayanan and Mike Cayanan authored Sep 3, 2020
1 parent bb5877c commit 12f58a5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 109 deletions.
68 changes: 25 additions & 43 deletions aws_get.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import json
import requests
import types
import re
import getpass
import sys
import os
from pprint import pformat
import logging
import tarfile
import notify_by_email
from hysds.celery import app
import boto3
from urllib.parse import urlparse
from hysds.es_util import get_grq_es

# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -20,34 +16,19 @@

def aws_get_script(dataset=None):
"""Return AWS get script."""

# query
es_url = app.conf["GRQ_ES_URL"]
grq_es = get_grq_es()
index = app.conf["DATASET_ALIAS"]
#facetview_url = app.conf["GRQ_URL"]
print(('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)))
logging.debug(
'%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))
print(json.dumps(dataset))
logging.debug(json.dumps(dataset))

r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' %
(es_url, index), json.dumps(dataset))
if r.status_code != 200:
print(("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2))))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
r.raise_for_status()
logger.debug("result: %s" % pformat(r.json()))
logger.debug("Dataset: {}".format(json.dumps(dataset, indent=2)))
paged_result = grq_es.es.search(body=dataset, index=index, size=10, scroll="10m")
logger.debug("Paged Result: {}".format(json.dumps(paged_result, indent=2)))

scan_result = r.json()
count = scan_result['hits']['total']
scroll_id = scan_result['_scroll_id']
scroll_ids = set()
count = paged_result["hits"]["total"]["value"]
scroll_id = paged_result["_scroll_id"]
scroll_ids.add(scroll_id)

# stream output a page at a time for better performance and lower memory footprint
def stream_aws_get(scroll_id):
#formatted_source = format_source(source)
def stream_aws_get(scroll_id, paged_result):
yield '#!/bin/bash\n#\n' + \
'# query:\n#\n' + \
'#%s#\n#\n#' % json.dumps(dataset) + \
Expand All @@ -56,32 +37,34 @@ def stream_aws_get(scroll_id):
aws_get_cmd = 'aws s3 sync {} {}\n'

while True:
r = requests.post('%s/_search/scroll?scroll=10m' %
es_url, data=scroll_id)
res = r.json()
logger.debug("res: %s" % pformat(res))
scroll_id = res['_scroll_id']
if len(res['hits']['hits']) == 0:
if len(paged_result['hits']['hits']) == 0:
break
# Elastic Search seems like it's returning duplicate urls. Remove duplicates
unique_urls = []
for hit in res['hits']['hits']:
for hit in paged_result['hits']['hits']:
[unique_urls.append(url) for url in hit['_source']['urls']
if url not in unique_urls and url.startswith("s3")]

for url in unique_urls:
logging.debug("urls in unique urls: %s", url)
parsed_url = urlparse(url)
yield 'echo "downloading %s"\n' % os.path.basename(parsed_url.path)
yield aws_get_cmd.format("{}://{}".format(parsed_url.scheme,
parsed_url.path[1:] if parsed_url.path.startswith('/') else parsed_url.path),
os.path.basename(parsed_url.path))
yield aws_get_cmd.format("{}://{}".format(
parsed_url.scheme, parsed_url.path[1:] if parsed_url.path.startswith('/') else parsed_url.path),
os.path.basename(parsed_url.path))
paged_result = grq_es.es.scroll(scroll_id=scroll_id, scroll="10m")
scroll_id = paged_result['_scroll_id']
scroll_ids.add(scroll_id)

# malarout: interate over each line of stream_aws_get response, and write to a file which is later attached to the email.
# malarout: interate over each line of stream_aws_get response, and write to a file which is later attached to the
# email.
with open('aws_get_script.sh', 'w') as f:
for i in stream_aws_get(scroll_id):
for i in stream_aws_get(scroll_id, paged_result):
f.write(i)

for sid in scroll_ids:
grq_es.es.clear_scroll(scroll_id=sid)

# for gzip compressed use file extension .tar.gz and modifier "w:gz"
os.rename('aws_get_script.sh', 'aws_get_script.bash')
tar = tarfile.open("aws_get.tar.gz", "w:gz")
Expand All @@ -94,7 +77,6 @@ def stream_aws_get(scroll_id):
Main program of aws_get_script
'''
# encoding to a JSON object
query = {}
query = json.loads(sys.argv[1])
emails = sys.argv[2]
rule_name = sys.argv[3]
Expand All @@ -110,7 +92,7 @@ def stream_aws_get(scroll_id):
body += "\n\nYou can use this AWS get script attached to download products.\n"
body += "Please rename aws_get_script.bash to aws_get_script.sh before running it."
if os.path.isfile('aws_get.tar.gz'):
aws_get_content = open('aws_get.tar.gz', 'r').read()
aws_get_content = open('aws_get.tar.gz', 'rb').read()
attachments = {'aws_get.tar.gz': aws_get_content}
notify_by_email.send_email(getpass.getuser(
), cc_recipients, bcc_recipients, subject, body, attachments=attachments)
103 changes: 37 additions & 66 deletions wget.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import json
import requests
import types
import re
import getpass
import sys
import os
from pprint import pformat
import logging
import tarfile
import notify_by_email
Expand All @@ -14,6 +10,8 @@
from urllib.parse import urlparse
import datetime

from hysds.es_util import get_grq_es


PRODUCT_TEMPLATE = "product_downloader-{0}-{1}-{2}"

Expand All @@ -26,50 +24,20 @@ def wget_script(dataset=None, glob_dict=None):
"""Return wget script."""

# query
es_url = app.conf["GRQ_ES_URL"]
"""Return AWS get script."""
grq_es = get_grq_es()
index = app.conf["DATASET_ALIAS"]
#facetview_url = app.conf["GRQ_URL"]
print(('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)))
logging.debug(
'%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))
print(json.dumps(dataset))
logging.debug(json.dumps(dataset))

r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' %
(es_url, index), json.dumps(dataset))
if r.status_code != 200:
print(("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2))))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
r.raise_for_status()
logger.debug("result: %s" % pformat(r.json()))

scan_result = r.json()
count = scan_result['hits']['total']
#size = int(math.ceil(count/10.0))
#print("SIZE : %d" %size)
#scroll_id = scan_result['_scroll_id']
logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=%s' %
(es_url, index, count))
r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=%s' %
(es_url, index, count), json.dumps(dataset))
if r.status_code != 200:
print(("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2))))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
r.raise_for_status()
logger.debug("result: %s" % pformat(r.json()))

scan_result = r.json()
count = scan_result['hits']['total']

scroll_id = scan_result['_scroll_id']
logger.debug("Dataset: {}".format(json.dumps(dataset, indent=2)))
paged_result = grq_es.es.search(body=dataset, index=index, size=100, scroll="10m")
logger.debug("Paged Result: {}".format(json.dumps(paged_result, indent=2)))

scroll_ids = set()
count = paged_result["hits"]["total"]["value"]
scroll_id = paged_result["_scroll_id"]
scroll_ids.add(scroll_id)

# stream output a page at a time for better performance and lower memory footprint
def stream_wget(scroll_id, glob_dict=None):
#formatted_source = format_source(source)
def stream_wget(scroll_id, paged_result, glob_dict=None):
yield '#!/bin/bash\n#\n' + \
'# query:\n#\n' + \
'%s#\n#\n#' % json.dumps(dataset) + \
Expand All @@ -82,16 +50,11 @@ def stream_wget(scroll_id, glob_dict=None):
wget_cmd_password = wget_cmd + ' --user=$user --password=$password'

while True:
r = requests.post('%s/_search/scroll?scroll=10m' %
es_url, data=scroll_id)
res = r.json()
logger.debug("res: %s" % pformat(res))
scroll_id = res['_scroll_id']
if len(res['hits']['hits']) == 0:
if len(paged_result['hits']['hits']) == 0:
break
# Elastic Search seems like it's returning duplicate urls. Remove duplicates
unique_urls = []
for hit in res['hits']['hits']:
for hit in paged_result['hits']['hits']:
[unique_urls.append(url) for url in hit['_source']['urls']
if url not in unique_urls and url.startswith("http")]

Expand Down Expand Up @@ -126,14 +89,23 @@ def stream_wget(scroll_id, glob_dict=None):
yield "%s --cut-dirs=%d %s/\n" % (wget_cmd, cut_dirs, url)
break

# malarout: interate over each line of stream_wget response, and write to a file which is later attached to the email.
with open('wget_script.sh','w') as f:
for i in stream_wget(scroll_id, glob_dict):
f.write(i)
paged_result = grq_es.es.scroll(scroll_id=scroll_id, scroll="10m")
logger.debug("paged result: {}".format(json.dumps(paged_result, indent=2)))
scroll_id = paged_result['_scroll_id']
scroll_ids.add(scroll_id)

# malarout: interate over each line of stream_wget response, and write to a file which is later attached to the
# email.
with open('wget_script.sh', 'w') as f:
for i in stream_wget(scroll_id, paged_result, glob_dict):
f.write(i)

for sid in scroll_ids:
grq_es.es.clear_scroll(scroll_id=sid)

# for gzip compressed use file extension .tar.gz and modifier "w:gz"
# os.rename('wget_script.sh','wget_script.bash')
#tar = tarfile.open("wget.tar.gz", "w:gz")
# tar = tarfile.open("wget.tar.gz", "w:gz")
# tar.add('wget_script.bash')
# tar.close()

Expand Down Expand Up @@ -178,7 +150,7 @@ def email(query, emails, rule_name):
body += "\n\nYou can use this wget script attached to download products.\n"
body += "Please rename wget_script.bash to wget_script.sh before running it."
if os.path.isfile('wget.tar.gz'):
wget_content = open('wget.tar.gz', 'r').read()
wget_content = open('wget.tar.gz', 'rb').read()
attachments = {'wget.tar.gz': wget_content}
notify_by_email.send_email(getpass.getuser(), cc_recipients,
bcc_recipients, subject, body, attachments=attachments)
Expand All @@ -199,6 +171,7 @@ def make_product(rule_name, query):
with open("{0}/{0}.dataset.json".format(name), "w") as fp:
json.dump({"id": name, "version": "v0.1"}, fp)


def glob_filter(names, glob_dict):
import fnmatch
files = []
Expand All @@ -217,15 +190,14 @@ def glob_filter(names, glob_dict):
if exclude_csv:
pattern_list_exc = [item.strip() for item in exclude_csv.split(',')]


for pat in pattern_list_exc:
matching = fnmatch.filter(names, "*" + pat)
files_exclude.extend(matching)

files_exclude = list(set(files_exclude))
print("Got the following files to exclude: %s" % str(files_exclude))

#unique list
# unique list
files_final = [x for x in files if x not in files_exclude]
retfiles_set = set(files_final)
print("Got the following files: %s" % str(retfiles_set))
Expand All @@ -237,7 +209,6 @@ def glob_filter(names, glob_dict):
Main program of wget_script
'''
# encoding to a JSON object
query = {}
query = json.loads(sys.argv[1])
emails = sys.argv[2]
rule_name = sys.argv[3]
Expand All @@ -247,18 +218,18 @@ def glob_filter(names, glob_dict):
context_file = '_context.json'
with open(context_file, 'r') as fin:
context = json.load(fin)
except:
raise Exception('unable to parse _context.json from work directory')
except Exception as e:
raise Exception('unable to parse _context.json from work directory: {}'.format(str(e)))

glob_dict = None
if "include_glob" in context and "exclude_glob" in context:
glob_dict = {"include":context["include_glob"], "exclude": context["exclude_glob"]}
glob_dict = {"include": context["include_glob"], "exclude": context["exclude_glob"]}

# getting the script

wget_script(query, glob_dict)
if emails=="unused":
make_product(rule_name, query)
if emails == "unused":
make_product(rule_name, query)
else:
# now email the query
email(query, emails, rule_name)

0 comments on commit 12f58a5

Please sign in to comment.