Skip to content

Commit

Permalink
HC-205: improve job revocation mechanism and bug fixes (#17)
Browse files Browse the repository at this point in the history
* refactor revoke to include recording of revoked task id; fix bug

* retrieve uuid correctly

* remove sleeps and utilize exponential backoff instead

* initialize redis pool
  • Loading branch information
pymonger authored Apr 30, 2020
1 parent 5f22b0b commit 92e2e5b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 26 deletions.
7 changes: 5 additions & 2 deletions purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from hysds.celery import app
from hysds_commons.elasticsearch_utils import ElasticsearchUtility

from utils import revoke


LOG_FILE_NAME = 'purge.log'
logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG)
logger = logging
Expand Down Expand Up @@ -73,7 +76,7 @@ def purge_products(query, component, operation):
if state in ["RETRY", "STARTED"] or (state == "PENDING" and not purge):
if not purge:
logger.info('Revoking %s\n', uuid)
app.control.revoke(uuid, terminate=True)
revoke(uuid, state)
else:
logger.info('Cannot remove active job %s\n', uuid)
continue
Expand All @@ -84,7 +87,7 @@ def purge_products(query, component, operation):
# Safety net to revoke job if in PENDING state
if state == "PENDING":
logger.info('Revoking %s\n', uuid)
app.control.revoke(uuid, terminate=True)
revoke(uuid, state)

# Both associated task and job from ES
logger.info('Removing document from index %s for %s', index, payload_id)
Expand Down
55 changes: 31 additions & 24 deletions retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import time
import traceback
import backoff
from random import randint, uniform
from datetime import datetime
from celery import uuid
Expand All @@ -12,6 +13,9 @@
from hysds.log_utils import log_job_status
from hysds_commons.elasticsearch_utils import ElasticsearchUtility

from utils import revoke


JOBS_ES_URL = app.conf["JOBS_ES_URL"]
STATUS_ALIAS = app.conf["STATUS_ALIAS"]
es = ElasticsearchUtility(JOBS_ES_URL)
Expand All @@ -23,6 +27,10 @@ def read_context():
return cxt


@backoff.on_exception(backoff.expo,
Exception,
max_tries=10,
max_value=64)
def query_es(job_id):
query_json = {
"query": {
Expand All @@ -33,14 +41,15 @@ def query_es(job_id):
}
}
}
doc = es.search(STATUS_ALIAS, query_json)
if doc['hits']['total']['value'] == 0:
raise LookupError('job id %s not found in Elasticsearch' % job_id)
return doc
return es.search(STATUS_ALIAS, query_json)


def rand_sleep(sleep_min=0.1, sleep_max=1): time.sleep(
uniform(sleep_min, sleep_max))
@backoff.on_exception(backoff.expo,
Exception,
max_tries=10,
max_value=64)
def delete_by_id(index, id):
es.delete_by_id(index, id)


def get_new_job_priority(old_priority, increment_by, new_priority):
Expand All @@ -60,9 +69,6 @@ def resubmit_jobs(context):
logic to resubmit the job
:param context: contents from _context.json
"""
# random sleep to prevent from getting ElasticSearch errors:
# 429 Client Error: Too Many Requests
time.sleep(randint(1, 5))

# iterate through job ids and query to get the job json
increment_by = None
Expand All @@ -82,13 +88,14 @@ def resubmit_jobs(context):
for job_id in retry_job_ids:
print(("Validating retry job: {}".format(job_id)))
try:
# get job json for ES
rand_sleep()

doc = query_es(job_id)
if doc['hits']['total']['value'] == 0:
print('job id %s not found in Elasticsearch. Continuing.' % job_id)
continue
doc = doc["hits"]["hits"][0]

job_json = doc["_source"]["job"]
task_id = doc["_source"]["uuid"]
index = doc["_index"]
_id = doc["_id"]

Expand Down Expand Up @@ -123,29 +130,29 @@ def resubmit_jobs(context):
job_json['priority'] = get_new_job_priority(old_priority=old_priority, increment_by=increment_by,
new_priority=new_priority)

# revoke original job
rand_sleep()
# get state
task = app.AsyncResult(task_id)
state = task.state

# revoke
job_id = job_json['job_id']
try:
app.control.revoke(job_id, terminate=True)
print("revoked original job: %s" % job_id)
revoke(task_id, state)
print("revoked original job: %s (%s)" % (job_id, task_id))
except:
print("Got error issuing revoke on job %s: %s" % (job_id, traceback.format_exc()))
print("Got error issuing revoke on job %s (%s): %s" % (job_id, task_id, traceback.format_exc()))
print("Continuing.")

# generate celery task id
task_id = uuid()
job_json['task_id'] = task_id
new_task_id = uuid()
job_json['task_id'] = new_task_id

# delete old job status
rand_sleep()
es.delete_by_id(index, _id)
delete_by_id(index, _id)

# log queued status
rand_sleep()
job_status_json = {
'uuid': task_id,
'uuid': new_task_id,
'job_id': job_id,
'payload_id': job_json['job_info']['job_payload']['payload_task_id'],
'status': 'job-queued',
Expand All @@ -159,7 +166,7 @@ def resubmit_jobs(context):
time_limit=job_json['job_info']['time_limit'],
soft_time_limit=job_json['job_info']['soft_time_limit'],
priority=job_json['priority'],
task_id=task_id)
task_id=new_task_id)
except Exception as ex:
print("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc()),
file=sys.stderr)
Expand Down
39 changes: 39 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/env python
import backoff
from redis import BlockingConnectionPool, StrictRedis, RedisError

from hysds.celery import app


REVOKED_TASK_POOL = None
REVOKED_TASK_TMPL = "hysds-revoked-task-%s"


def set_redis_revoked_task_pool():
"""Set redis connection pool for worker status."""

global REVOKED_TASK_POOL
if REVOKED_TASK_POOL is None:
REVOKED_TASK_POOL = BlockingConnectionPool.from_url(
app.conf.REDIS_JOB_STATUS_URL)


@backoff.on_exception(backoff.expo,
RedisError,
max_tries=10,
max_value=64)
def revoke(task_id, state):
"""Revoke task."""

# set redis pool
set_redis_revoked_task_pool()
global REVOKED_TASK_POOL

# record revoked task
r = StrictRedis(connection_pool=REVOKED_TASK_POOL)
r.setex(REVOKED_TASK_TMPL % task_id,
app.conf.HYSDS_JOB_STATUS_EXPIRES,
state)

# revoke task
app.control.revoke(task_id, terminate=True)

0 comments on commit 92e2e5b

Please sign in to comment.