Skip to content

Commit

Permalink
[SSDS-2703]Parallelize purge operation (#25)
Browse files Browse the repository at this point in the history
* SSDS-2703:Initial commit

* SSDS-2703:fix

* SSDS-2703:Updated multiprocessing call

* SSDS-2703:debug line

* SSDS-2703:Fix

* SSDS-2703:fix2

* SSDS-2703:ServerProxy

* SSDS-2703:Fix

* SSDS-2703:test

* SSDS-2703:Fix

* SSDS-2703:Fix

* SSDS-2703:Fix

* SSDS-2703:Fix

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

* SSDS-2703:test

Co-authored-by: andrewmz <[email protected]>
  • Loading branch information
azhang57 and andrewmz authored Oct 6, 2022
1 parent 85c2b7e commit b2de775
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions purge.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
#!/bin/env python
import json
import logging
import psutil

import osaka.main
from hysds.celery import app
from hysds.es_util import get_mozart_es, get_grq_es
from utils import revoke, create_info_message_files

from multiprocessing import Pool

LOG_FILE_NAME = 'purge.log'
logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG)
logger = logging

tosca_es = get_grq_es()


def read_context():
with open('_context.json', 'r') as f:
cxt = json.load(f)
return cxt


def delete_dataset(es_result):
ident = es_result["_id"]
index = es_result["_index"]
dataset = es_result["_source"]["dataset"]
# find the Best URL first
best = None
for url in es_result["_source"]["urls"]:
if not url.startswith("http"):
best = url

print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product
if best is not None:
osaka.main.rmall(best)

tosca_es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata
logger.info('Purged %s' % ident)

return dataset


def purge_products(query, component, operation):
"""
Iterator used to iterate across a query result and submit jobs for every hit
Expand All @@ -37,30 +60,17 @@ def purge_products(query, component, operation):
es_index = app.conf["DATASET_ALIAS"]

results = es.query(index=es_index, body=query) # Querying for products
num_processes = psutil.cpu_count() - 2
p = Pool(processes=num_processes)
if component == 'tosca':
deleted_datasets = dict()
for result in results:
ident = result["_id"]
index = result["_index"]
dataset = result["_source"]["dataset"]
# find the Best URL first
best = None
for url in result["_source"]["urls"]:
if not url.startswith("http"):
best = url

print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product
if best is not None:
osaka.main.rmall(best)

es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata
logger.info('Purged %s' % ident)
updated_datasets = p.map(delete_dataset, results)
for dataset in updated_datasets:
if dataset in deleted_datasets:
count = deleted_datasets[dataset]
deleted_datasets[dataset] = count + 1
else:
deleted_datasets[dataset] = 1

if len(deleted_datasets) != 0:
msg_details = "Datasets purged by type:\n\n"
for ds in deleted_datasets.keys():
Expand Down

0 comments on commit b2de775

Please sign in to comment.