From 8f2826884f622527e9d9b3019d58228884a0a5a0 Mon Sep 17 00:00:00 2001 From: Michael Cayanan <42812746+mcayanan@users.noreply.github.com> Date: Thu, 31 Oct 2024 09:22:22 -0700 Subject: [PATCH] HC-556: Update retry and purge jobs to delete all possible instances of a job across multiple indices (#35) * HC-556: Update delete_by_id function to delete by job_status-current * delete the jobs in a different way * wait for job-revoked state to be logged before continuing * use ensure_job_indexed * update how the container is being built * add user ops * revert back to print statements * revert back to print statements * query by correct thing * typo * make wait shorter * add logging * add logging format * should be retry.log * update query * update query * add comment * Revert "add comment" This reverts commit 0babf15b20a1d6a90b448d122014dcb0827e7b51. * update purge to remove all instances of a job * add timestamps to logging; surround with try/except * remove waiting for a revoked status --------- Co-authored-by: Mike Cayanan --- docker/Dockerfile | 10 ++++++++ purge.py | 14 ++++++++---- retry.py | 58 ++++++++++++++++++++++++++++++----------------- 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index eacdbc1..3870781 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -4,6 +4,16 @@ MAINTAINER malarout "Namrata.Malarout@jpl.nasa.gov" LABEL description="Lightweight System Jobs" # provision lightweight-jobs PGE + +# create ops user and group +RUN useradd -M -u 10005 ops + +# softlink /home/ops +RUN ln -s /root /home/ops + +# Make sure $HOME is set when we run this container +ENV HOME=/home/ops + USER ops COPY . /home/ops/lightweight-jobs diff --git a/purge.py b/purge.py index 6b70d4c..22367bd 100644 --- a/purge.py +++ b/purge.py @@ -11,10 +11,12 @@ from utils import revoke, create_info_message_files LOG_FILE_NAME = 'purge.log' -logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.INFO) +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, filename=LOG_FILE_NAME, filemode='a', level=logging.INFO) logger = logging tosca_es = get_grq_es() +mozart_es = get_mozart_es() def read_context(): @@ -145,10 +147,12 @@ def purge_products(query, component, operation, delete_from_obj_store=True): logger.info('Revoking %s\n', uuid) revoke(uuid, state) - # Delete job from ES - logger.info('Removing document from index %s for %s', index, payload_id) - es.delete_by_id(index=index, id=payload_id, ignore=404) - logger.info('Removed %s from index: %s', payload_id, index) + # Delete job(s) from ES + results = es.search_by_id(index=index, id=payload_id, return_all=True, ignore=404) + for result in results: + logger.info('Removing document from index %s for %s', result['_id'], result['_index']) + es.delete_by_id(index=result['_index'], id=result['_id']) + logger.info('Removed %s from index: %s', result['_id'], result['_index']) logger.info('Finished.') diff --git a/retry.py b/retry.py index c301f1c..7eb915f 100644 --- a/retry.py +++ b/retry.py @@ -3,6 +3,8 @@ import json import traceback import backoff +import logging + from datetime import datetime from celery import uuid @@ -15,6 +17,13 @@ STATUS_ALIAS = app.conf["STATUS_ALIAS"] +JOB_STATUS_CURRENT = "job_status-current" + +LOG_FILE_NAME = 'retry.log' +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, filename=LOG_FILE_NAME, filemode='a', level=logging.INFO) +logger = logging + mozart_es = get_mozart_es() @@ -35,20 +44,23 @@ def query_es(job_id): } } } - return mozart_es.search(index="job_status-current", body=query_json) + return mozart_es.search(index=JOB_STATUS_CURRENT, body=query_json) @backoff.on_exception(backoff.expo, Exception, max_tries=10, max_value=64) def delete_by_id(index, _id): - mozart_es.delete_by_id(index=index, id=_id) + results = mozart_es.search_by_id(index=index, id=_id, return_all=True) + for result in results: + logger.info(f"Deleting job {result['_id']} in {result['_index']}") + mozart_es.delete_by_id(index=result['_index'], id=result['_id']) def get_new_job_priority(old_priority, increment_by, new_priority): if increment_by is not None: priority = int(old_priority) + int(increment_by) if priority == 0 or priority == 9: - print("Not applying {} on previous priority of {}") - print("Priority must be between 0 and 8".format(increment_by, old_priority)) + logger.info("Not applying {} on previous priority of {}") + logger.info("Priority must be between 0 and 8".format(increment_by, old_priority)) priority = int(old_priority) else: priority = int(new_priority) @@ -77,11 +89,11 @@ def resubmit_jobs(context): retry_job_ids = [context['retry_job_id']] for job_id in retry_job_ids: - print(("Validating retry job: {}".format(job_id))) + logger.info("Validating retry job: {}".format(job_id)) try: doc = query_es(job_id) if doc['hits']['total']['value'] == 0: - print('job id %s not found in Elasticsearch. Continuing.' % job_id) + logger.warning('job id %s not found in Elasticsearch. Continuing.' % job_id) continue doc = doc["hits"]["hits"][0] @@ -91,12 +103,12 @@ def resubmit_jobs(context): _id = doc["_id"] if not index.startswith("job"): - print("Cannot retry a worker: %s" % _id) + logger.error("Cannot retry a worker: %s" % _id) continue # don't retry a retry if job_json['type'].startswith('job-lw-mozart-retry'): - print("Cannot retry retry job %s. Skipping" % job_id) + logger.error("Cannot retry retry job %s. Skipping" % job_id) continue # check retry_remaining_count @@ -104,8 +116,8 @@ def resubmit_jobs(context): if job_json['retry_count'] < retry_count_max: job_json['retry_count'] = int(job_json['retry_count']) + 1 else: - print("For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again." - .format(job_id, job_json['retry_count'], retry_count_max)) + logger.error("For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again." + .format(job_id, job_json['retry_count'], retry_count_max)) continue else: job_json['retry_count'] = 1 @@ -133,32 +145,36 @@ def resubmit_jobs(context): job_id = job_json['job_id'] try: revoke(task_id, state) - print("revoked original job: %s (%s)" % (job_id, task_id)) + logger.info("revoked original job: %s (%s) state=%s" % (job_id, task_id, state)) except: - print("Got error issuing revoke on job %s (%s): %s" % (job_id, task_id, traceback.format_exc())) - print("Continuing.") + logger.error("Got error issuing revoke on job %s (%s): %s" % (job_id, task_id, traceback.format_exc())) + logger.error("Continuing.") # generate celery task id new_task_id = uuid() job_json['task_id'] = new_task_id - # delete old job status - delete_by_id(index, _id) + # delete old job status; we should pass in the job_status-current alias + # instead so that we make sure to properly handle the scenario where + # figaro rules are in place to auto retry jobs that fail due to spot termination. + # This may potentially cause duplicate records across the job_status + # and job_failed indices + delete_by_id(JOB_STATUS_CURRENT, _id) # check if new queues, soft time limit, and time limit values were set new_job_queue = context.get("job_queue", "") if new_job_queue: - print(f"new job queue specified. Sending retry job to {new_job_queue}") + logger.info(f"new job queue specified. Sending retry job to {new_job_queue}") job_json['job_info']['job_queue'] = new_job_queue new_soft_time_limit = context.get("soft_time_limit", "") if new_soft_time_limit: - print(f"new soft time limit specified. Setting new soft time limit to {int(new_soft_time_limit)}") + logger.info(f"new soft time limit specified. Setting new soft time limit to {int(new_soft_time_limit)}") job_json['job_info']['soft_time_limit'] = int(new_soft_time_limit) new_time_limit = context.get("time_limit", "") if new_time_limit: - print(f"new time limit specified. Setting new time limit to {int(new_time_limit)}") + logger.info(f"new time limit specified. Setting new time limit to {int(new_time_limit)}") job_json['job_info']['time_limit'] = int(new_time_limit) # Before re-queueing, check to see if the job was under the job_failed index. If so, need to @@ -183,9 +199,9 @@ def resubmit_jobs(context): soft_time_limit=job_json['job_info']['soft_time_limit'], priority=job_json['priority'], task_id=new_task_id) + logger.info(f"re-submitted job_id={job_id}, payload_id={job_status_json['payload_id']}, task_id={new_task_id}") except Exception as ex: - print("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc()), - file=sys.stderr) + logger.error("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc())) if __name__ == "__main__": @@ -194,4 +210,4 @@ def resubmit_jobs(context): # if input_type == "job": resubmit_jobs(ctx) # else: - # print("Cannot retry a task, worker or event.") + # logger.info("Cannot retry a task, worker or event.")