From 845745b0e331ad826252f675eb23e8ebd774320d Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Tue, 6 Feb 2024 19:29:04 -0800 Subject: [PATCH] UPDATE the SQL statement used in Nucleus pds-nucleus-datasync-completion lambda code to make sure both product table product_data_file_mapping table are updated in a consistent way (make sure both tables are updated). Refer to task https://github.com/NASA-PDS/nucleus/issues/54 --- .../pds-nucleus-product-completion-checker.py | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/src/pds/ingress/pds-nucleus-product-completion-checker.py b/src/pds/ingress/pds-nucleus-product-completion-checker.py index b2b6b6d..c03d52f 100644 --- a/src/pds/ingress/pds-nucleus-product-completion-checker.py +++ b/src/pds/ingress/pds-nucleus-product-completion-checker.py @@ -1,7 +1,7 @@ """ -============================================== -pds-nucleus-product-completion-checker-batch.py -============================================== +============================================================ +pds-nucleus-product-completion-checker.py (batch processing) +============================================================ = Lambda function to check if the staging S3 bucket has received a complete product with all required files. This lambda function is triggered periodically. @@ -18,6 +18,7 @@ import http.client import base64 import ast +import uuid from xml.dom import minidom @@ -29,10 +30,10 @@ rds_data = boto3.client('rds-data') mwaa_env_name = 'PDS-Nucleus-Airflow-Env' -dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs' mwaa_cli_command = 'dags trigger' # Read environment variables from lambda configurations +dag_name = os.environ.get('AIRFLOW_DAG_NAME') node_name = os.environ.get('NODE_NAME') es_url = os.environ.get('ES_URL') replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH') @@ -47,7 +48,7 @@ def lambda_handler(event, context): """ Main lambda handler """ - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) logger.info(f"Lambda Request ID: {context.aws_request_id}") @@ -65,18 +66,20 @@ def process_completed_products(): logger.debug("Checking completed products...") - sql = """ - select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label - NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping - where s3_url_of_data_file - NOT IN (select s3_url_of_data_file from data_file)); + sql = """ + SELECT DISTINCT s3_url_of_product_label from product + WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label + NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping + where s3_url_of_data_file + NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label + IN (SELECT s3_url_of_product_label from product_data_file_mapping); """ response = rds_data.execute_statement( - resourceArn=db_clust_arn, - secretArn=db_secret_arn, - database='pds_nucleus', - sql=sql) + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql) logger.debug(f"Number of completed product labels : {str(len(response['records']))}") @@ -90,9 +93,10 @@ def process_completed_products(): for data_dict in record: for data_type, s3_url_of_product_label in data_dict.items(): - update_product_processing_status_in_database(s3_url_of_product_label, 'COMPLETE') + update_product_processing_status_in_database(s3_url_of_product_label,'COMPLETE') list_of_product_labels_to_process.append(s3_url_of_product_label) + if count == n: submit_data_to_nucleus(list_of_product_labels_to_process) count = 0 @@ -114,16 +118,15 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', 'value': {'longValue': round(time.time() * 1000)}} - s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', - 'value': {'stringValue': s3_url_of_product_label}} + s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', 'value': {'stringValue': s3_url_of_product_label}} param_set = [processing_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param] response = rds_data.execute_statement( - resourceArn=db_clust_arn, - secretArn=db_secret_arn, - database='pds_nucleus', - sql=sql, + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, parameters=param_set) logger.debug(f"Response for update_product_processing_status_in_database: {str(response)}") @@ -144,7 +147,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc harvest_config_dir = efs_mount_path + '/harvest-configs' - file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1)) + file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) ) harvest_manifest_content = "" list_of_product_labels_to_process_with_file_paths = [] @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n' list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location) + # Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names + random_suffix = uuid.uuid4().hex + try: os.makedirs(harvest_config_dir, exist_ok=True) - harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg' - harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt' + harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg' + harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt' logger.debug(f"Manifest content: {str(harvest_manifest_content)}") @@ -184,11 +190,12 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc """ with open(harvest_config_file_path, "w") as f: - f.write(harvest_config_xml_content) + f.write(harvest_config_xml_content) logger.info(f"Created harvest config XML file: {harvest_config_file_path}") except Exception as e: logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}") + return trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path, list_of_product_labels_to_process_with_file_paths)