Skip to content

Commit

Permalink
ADD Changes in lambda functions to horizontally scale, instead of try…
Browse files Browse the repository at this point in the history
…ing to process a large amount of files in one lambda

Refer to task #54
  • Loading branch information
ramesh-maddegoda committed Feb 15, 2024
1 parent 04eb56e commit bcaa1fb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 37 deletions.
70 changes: 70 additions & 0 deletions src/pds/ingress/pds-nucleus-datasync-completion-trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
============================================================
pds-nucleus-datasync-completion-trigger.py
============================================================ =
Lambda function that get triggered when the PDS Nucleus Staging S3 Bucket to EFS datasync task is completed.
This function reads the list of data transfer verification reports created by a specific DataSync task
and asynchronously call the pds-nucleus-datasync-completion.py for each data transfer verification
report JSON file.
"""

import boto3
import logging
import json
import os

datasync_reports_s3_bucket_name = "pds-nucleus-datassync-reports"

logger = logging.getLogger("pds-nucleus-datasync-completion-trigger")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')

s3_bucket_name = "pds-nucleus-staging"
db_clust_arn = os.environ.get('DB_CLUSTER_ARN')
db_secret_arn = os.environ.get('DB_SECRET_ARN')
efs_mount_path = os.environ.get('EFS_MOUNT_PATH')

rds_data = boto3.client('rds-data')

# Define the client to interact with AWS Lambda
client = boto3.client('lambda')


def lambda_handler(event, context):
""" Lambda Handler """

logger.info(f"Lambda Request ID: {context.aws_request_id}")
logger.info(f"Event: {event}")
resource = event['resources']

resource_list = resource[0].split("/")
task_id = resource_list[1]
exec_id = resource_list[3]

prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-"

datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name)

list_of_files = []

# Loop through the list of json files with the prefix files-verified-
for transfer_report in datasync_reports_s3_bucket.objects.filter(Prefix=prefix):
# Define the input parameters that will be passed
# on to the child function
inputParams = {
"s3_key": transfer_report.key
}

logger.debug(f"inputParams: {str(inputParams)}")
response = client.invoke(
FunctionName='arn:aws:lambda:us-west-2:441083951559:function:pds-nucleus-datasync-completion',
InvocationType='Event',
Payload=json.dumps(inputParams)
)

logger.debug(f"List_of_files received: {list_of_files}")
59 changes: 26 additions & 33 deletions src/pds/ingress/pds-nucleus-datasync-completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
pds-nucleus-datasync-completion.py
==============================================
Lambda function that get triggered when the PDS Nucleus Staging S3 Bucket to EFS datasync task is completed.
Lambda function to read each data transfer report JSON file and update the Nucleus database tables ()
(product table, product_data_file_mapping table and data_file table) based on the list of verified
files found in the data transfer report. This lambda function is asynchronously called by
the lambda function called pds-nucleus-datasync-completion-trigger.py.
"""

import boto3
import urllib.parse
import logging
import json
import os
Expand Down Expand Up @@ -36,45 +38,36 @@ def lambda_handler(event, context):

logger.info(f"Lambda Request ID: {context.aws_request_id}")
logger.info(f"Event: {event}")
resource = event['resources']

resource_list = resource[0].split("/")
task_id = resource_list[1]
exec_id = resource_list[3]
json_event = json.dumps(event)
transfer_report = json.loads(json_event)
content_object = s3.Object('pds-nucleus-datassync-reports', str(transfer_report["s3_key"]))
transfer_report_file_content = content_object.get()['Body'].read().decode('utf-8')
transfer_report_json_content = json.loads(transfer_report_file_content)
verified_file_obj_list = transfer_report_json_content['Verified']

prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-"
logger.debug(f"verified_file_obj_list: {verified_file_obj_list}")

datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name)
# Process each file in verified file list
for file_obj in verified_file_obj_list:

list_of_files = []
obj_name = file_obj['RelativePath']
obj_type = file_obj['SrcMetadata']['Type']

for transfer_report in datasync_reports_s3_bucket.objects.filter(Prefix=prefix):
if obj_type == 'Regular': # Not a directory

transfer_report_file_content = transfer_report.get()['Body'].read().decode('utf-8')
transfer_report_json_content = json.loads(transfer_report_file_content)
verified_file_obj_list = transfer_report_json_content['Verified']
if obj_name.endswith('.fz'):
file_to_extract = f"/mnt/data/{s3_bucket_name}" + obj_name
extract_file(file_to_extract)
obj_name = obj_name.rstrip(",.fz")

logger.debug(f"verified_file_obj_list: {verified_file_obj_list}")
s3_url_of_file = "s3://" + s3_bucket_name + obj_name

for file_obj in verified_file_obj_list:
s3_key = obj_name[1:]

obj_name = file_obj['RelativePath']
obj_type = file_obj['SrcMetadata']['Type']
handle_file_types(s3_url_of_file, s3_bucket_name, s3_key)

if obj_type == 'Regular': # Not a directory

if obj_name.endswith('.fz'):
file_to_extract = f"/mnt/data/{s3_bucket_name}" + obj_name
extract_file(file_to_extract)
obj_name = obj_name.rstrip(",.fz")

s3_url_of_file = "s3://" + s3_bucket_name + obj_name

s3_key = obj_name[1:]

handle_file_types(s3_url_of_file, s3_bucket_name, s3_key)

list_of_files.append(s3_url_of_file)
list_of_files.append(s3_url_of_file)

logger.debug(f"List_of_files received: {list_of_files}")

Expand Down Expand Up @@ -185,8 +178,8 @@ def save_product_processing_status_in_database(s3_url_of_product_label, processi
database='pds_nucleus',
sql=sql,
parameters=param_set)
logger.debug(str(response))

print(str(response))
except Exception as e:
logger.error(f"Error writing to product table. Exception: {str(e)}")
raise e
Expand Down Expand Up @@ -222,7 +215,7 @@ def save_data_file_in_database(s3_url_of_data_file):
sql=sql,
parameters=param_set)

logger.info(str(response))
logger.debug(str(response))

except Exception as e:
logger.error(f"Error updating data_file table. Exception: {str(e)}")
Expand Down
9 changes: 5 additions & 4 deletions src/pds/ingress/pds-nucleus-product-completion-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
"""

import json
import urllib.parse
import logging
import shutil
import boto3
import os
import time
Expand Down Expand Up @@ -66,13 +63,17 @@ def process_completed_products():

logger.debug("Checking completed products...")

# The limit 1000 was used in following query to avoid the error "Database returned more than the allowed response size limit"
# The remaining records will be retrieved in the subsequent queries.


sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping);
IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 1000;
"""

response = rds_data.execute_statement(
Expand Down

0 comments on commit bcaa1fb

Please sign in to comment.