Skip to content

Commit

Permalink
UPDATE the SQL statement used in Nucleus pds-nucleus-datasync-complet…
Browse files Browse the repository at this point in the history
…ion lambda code to make sure both product table product_data_file_mapping table are updated in a consistent way (make sure both tables are updated).

Refer to task #54
  • Loading branch information
ramesh-maddegoda committed Feb 7, 2024
1 parent 19b85a3 commit 845745b
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions src/pds/ingress/pds-nucleus-product-completion-checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
==============================================
pds-nucleus-product-completion-checker-batch.py
==============================================
============================================================
pds-nucleus-product-completion-checker.py (batch processing)
============================================================ =
Lambda function to check if the staging S3 bucket has received a complete product
with all required files. This lambda function is triggered periodically.
Expand All @@ -18,6 +18,7 @@
import http.client
import base64
import ast
import uuid

from xml.dom import minidom

Expand All @@ -29,10 +30,10 @@
rds_data = boto3.client('rds-data')

mwaa_env_name = 'PDS-Nucleus-Airflow-Env'
dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs'
mwaa_cli_command = 'dags trigger'

# Read environment variables from lambda configurations
dag_name = os.environ.get('AIRFLOW_DAG_NAME')
node_name = os.environ.get('NODE_NAME')
es_url = os.environ.get('ES_URL')
replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH')
Expand All @@ -47,7 +48,7 @@
def lambda_handler(event, context):
""" Main lambda handler """

logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

logger.info(f"Lambda Request ID: {context.aws_request_id}")
Expand All @@ -65,18 +66,20 @@ def process_completed_products():

logger.debug("Checking completed products...")

sql = """
select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (select s3_url_of_data_file from data_file));
sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping);
"""

response = rds_data.execute_statement(
resourceArn=db_clust_arn,
secretArn=db_secret_arn,
database='pds_nucleus',
sql=sql)
resourceArn = db_clust_arn,
secretArn = db_secret_arn,
database = 'pds_nucleus',
sql = sql)

logger.debug(f"Number of completed product labels : {str(len(response['records']))}")

Expand All @@ -90,9 +93,10 @@ def process_completed_products():

for data_dict in record:
for data_type, s3_url_of_product_label in data_dict.items():
update_product_processing_status_in_database(s3_url_of_product_label, 'COMPLETE')
update_product_processing_status_in_database(s3_url_of_product_label,'COMPLETE')
list_of_product_labels_to_process.append(s3_url_of_product_label)


if count == n:
submit_data_to_nucleus(list_of_product_labels_to_process)
count = 0
Expand All @@ -114,16 +118,15 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces
processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}}
last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param',
'value': {'longValue': round(time.time() * 1000)}}
s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param',
'value': {'stringValue': s3_url_of_product_label}}
s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', 'value': {'stringValue': s3_url_of_product_label}}

param_set = [processing_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param]

response = rds_data.execute_statement(
resourceArn=db_clust_arn,
secretArn=db_secret_arn,
database='pds_nucleus',
sql=sql,
resourceArn = db_clust_arn,
secretArn = db_secret_arn,
database = 'pds_nucleus',
sql = sql,
parameters=param_set)

logger.debug(f"Response for update_product_processing_status_in_database: {str(response)}")
Expand All @@ -144,7 +147,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc

harvest_config_dir = efs_mount_path + '/harvest-configs'

file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1))
file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) )

harvest_manifest_content = ""
list_of_product_labels_to_process_with_file_paths = []
Expand All @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n'
list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location)

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
random_suffix = uuid.uuid4().hex

try:
os.makedirs(harvest_config_dir, exist_ok=True)
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt'
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt'

logger.debug(f"Manifest content: {str(harvest_manifest_content)}")

Expand All @@ -184,11 +190,12 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
"""

with open(harvest_config_file_path, "w") as f:
f.write(harvest_config_xml_content)
f.write(harvest_config_xml_content)

logger.info(f"Created harvest config XML file: {harvest_config_file_path}")
except Exception as e:
logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}")
return

trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path,
list_of_product_labels_to_process_with_file_paths)
Expand Down

0 comments on commit 845745b

Please sign in to comment.