From b2de7755dd043355204303cde1d4e5e1b6282abd Mon Sep 17 00:00:00 2001 From: azhang57 Date: Thu, 6 Oct 2022 08:40:20 -0700 Subject: [PATCH] [SSDS-2703]Parallelize purge operation (#25) * SSDS-2703:Initial commit * SSDS-2703:fix * SSDS-2703:Updated multiprocessing call * SSDS-2703:debug line * SSDS-2703:Fix * SSDS-2703:fix2 * SSDS-2703:ServerProxy * SSDS-2703:Fix * SSDS-2703:test * SSDS-2703:Fix * SSDS-2703:Fix * SSDS-2703:Fix * SSDS-2703:Fix * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test * SSDS-2703:test Co-authored-by: andrewmz --- purge.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/purge.py b/purge.py index 8aecc56..45ac6f1 100644 --- a/purge.py +++ b/purge.py @@ -1,17 +1,20 @@ #!/bin/env python import json import logging +import psutil import osaka.main from hysds.celery import app from hysds.es_util import get_mozart_es, get_grq_es from utils import revoke, create_info_message_files - +from multiprocessing import Pool LOG_FILE_NAME = 'purge.log' logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG) logger = logging +tosca_es = get_grq_es() + def read_context(): with open('_context.json', 'r') as f: @@ -19,6 +22,26 @@ def read_context(): return cxt +def delete_dataset(es_result): + ident = es_result["_id"] + index = es_result["_index"] + dataset = es_result["_source"]["dataset"] + # find the Best URL first + best = None + for url in es_result["_source"]["urls"]: + if not url.startswith("http"): + best = url + + print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product + if best is not None: + osaka.main.rmall(best) + + tosca_es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata + logger.info('Purged %s' % ident) + + return dataset + + def purge_products(query, component, operation): """ Iterator used to iterate across a query result and submit jobs for every hit @@ -37,30 +60,17 @@ def purge_products(query, component, operation): es_index = app.conf["DATASET_ALIAS"] results = es.query(index=es_index, body=query) # Querying for products + num_processes = psutil.cpu_count() - 2 + p = Pool(processes=num_processes) if component == 'tosca': deleted_datasets = dict() - for result in results: - ident = result["_id"] - index = result["_index"] - dataset = result["_source"]["dataset"] - # find the Best URL first - best = None - for url in result["_source"]["urls"]: - if not url.startswith("http"): - best = url - - print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product - if best is not None: - osaka.main.rmall(best) - - es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata - logger.info('Purged %s' % ident) + updated_datasets = p.map(delete_dataset, results) + for dataset in updated_datasets: if dataset in deleted_datasets: count = deleted_datasets[dataset] deleted_datasets[dataset] = count + 1 else: deleted_datasets[dataset] = 1 - if len(deleted_datasets) != 0: msg_details = "Datasets purged by type:\n\n" for ds in deleted_datasets.keys():