Skip to content

Commit

Permalink
fix: addig facade for collection creation
Browse files Browse the repository at this point in the history
  • Loading branch information
wphyojpl committed Sep 16, 2022
1 parent d5fa802 commit dc6cfe4
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,55 +1,16 @@
import json
import os
from threading import Thread

import pystac

from cumulus_lambda_functions.cumulus_stac.collection_transformer import CollectionTransformer
from cumulus_lambda_functions.cumulus_wrapper.query_collections import CollectionsQuery
from cumulus_lambda_functions.lib.aws.aws_lambda import AwsLambda
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class CollectionCreationThread(Thread):
def __init__(self, request_body):
super().__init__()
# self.thread_name = thread_name
# self.thread_ID = thread_ID
self.__request_body = request_body
self.__cumulus_collection_query = CollectionsQuery('', '')
self.__cumulus_lambda_prefix = os.getenv('CUMULUS_LAMBDA_PREFIX')
self.__ingest_sqs_url = os.getenv('CUMULUS_WORKFLOW_SQS_URL')
self.__workflow_name = os.getenv('CUMULUS_WORKFLOW_NAME', 'CatalogGranule')
self.__provider_id = '' # TODO. need this?
# helper function to execute the threads

def run(self):
try:
cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
LOGGER.error(f'status not in creation_result: {creation_result}')
return

rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
if 'status' not in rule_creation_result:
# 'TODO' delete collection
LOGGER.error(f'status not in rule_creation_result: {rule_creation_result}')
return
except Exception as e:
LOGGER.exception('error while creating new collection in Cumulus')
return
LOGGER.info(f'collection and rule created for: {self.__request_body}')
return


class CumulusCreateCollectionDapa:
def __init__(self, event):
required_env = ['CUMULUS_LAMBDA_PREFIX', 'CUMULUS_WORKFLOW_SQS_URL']
Expand All @@ -62,32 +23,14 @@ def __init__(self, event):
self.__ingest_sqs_url = os.getenv('CUMULUS_WORKFLOW_SQS_URL')
self.__workflow_name = os.getenv('CUMULUS_WORKFLOW_NAME', 'CatalogGranule')
self.__provider_id = '' # TODO. need this?
self.__collection_creation_lambda_name = os.environ.get('COLLECTION_CREATION_LAMBDA_NAME', '').strip()

def start(self):
if 'body' not in self.__event:
raise ValueError(f'missing body in {self.__event}')
self.__request_body = json.loads(self.__event['body'])
LOGGER.debug(f'request body: {self.__request_body}')
validation_result = pystac.Collection.from_dict(self.__request_body).validate()
if not isinstance(validation_result, list):
LOGGER.error(f'request body is not valid STAC collection: {validation_result}')
return {
'statusCode': 500,
'body': {'message': f'request body is not valid STAC Collection schema. check details',
'details': validation_result}
}
# thread1 = CollectionCreationThread(self.__request_body)
# LOGGER.info(f'starting background thread')
# thread1.start()
# LOGGER.info(f'not waiting for background thread')
# return {
# 'statusCode': 202,
# 'body': {'message': 'started in backgorund'}
# }
def execute_creation(self):
try:
cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
LOGGER.error(f'status not in creation_result: {creation_result}')
return {
'statusCode': 500,
'body': {
Expand All @@ -103,6 +46,7 @@ def start(self):
)
if 'status' not in rule_creation_result:
# 'TODO' delete collection
LOGGER.error(f'status not in rule_creation_result: {rule_creation_result}')
return {
'statusCode': 500,
'body': {
Expand All @@ -118,9 +62,38 @@ def start(self):
'details': str(e)
}
}
LOGGER.info(f'creation_result: {creation_result}')
return {
'statusCode': 200,
'body': {
'message': creation_result
}
}

def start(self):
if 'body' not in self.__event:
raise ValueError(f'missing body in {self.__event}')
self.__request_body = json.loads(self.__event['body'])
LOGGER.debug(f'request body: {self.__request_body}')
validation_result = pystac.Collection.from_dict(self.__request_body).validate()
if not isinstance(validation_result, list):
LOGGER.error(f'request body is not valid STAC collection: {validation_result}')
return {
'statusCode': 500,
'body': {'message': f'request body is not valid STAC Collection schema. check details',
'details': validation_result}
}
if self.__collection_creation_lambda_name != '':
response = AwsLambda().invoke_function(
function_name=self.__collection_creation_lambda_name,
payload=self.__event,
)
LOGGER.debug(f'async function started: {response}')
return {
'statusCode': 202,
'body': {
'message': 'processing'
}
}

return self.execute_creation()
20 changes: 20 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import json

from cumulus_lambda_functions.lib.aws.aws_cred import AwsCred


class AwsLambda(AwsCred):
def __init__(self):
super().__init__()
self.__lambda_client = self.get_client('lambda')

def invoke_function(self, function_name: str, payload: dict):
response = self.__lambda_client.invoke(
FunctionName=function_name,
InvocationType='Event', # 'Event' = async | 'RequestResponse' = sync | 'DryRun',
LogType='None', # 'None' = async | 'Tail = sync',
ClientContext='', # Up to 3583 bytes of base64-encoded data
Payload=json.dumps(payload).encode(),
# Qualifier='string'
)
return response
Empty file.
31 changes: 31 additions & 0 deletions tests/cumulus_lambda_functions/lib/aws/test_aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime
from unittest import TestCase

from cumulus_lambda_functions.cumulus_stac.unity_collection_stac import UnityCollectionStac
from cumulus_lambda_functions.lib.aws.aws_lambda import AwsLambda


class TestAwsLambda(TestCase):
def test_01(self):
dapa_collection = UnityCollectionStac() \
.with_id(f'CUMULUS_DAPA_UNIT_TEST___{int(datetime.utcnow().timestamp())}') \
.with_graule_id_regex("^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}0$") \
.with_granule_id_extraction_regex("(P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}0).+") \
.with_title("P1570515ATMSSCIENCEAXT11344000000001.PDS") \
.with_process('modis') \
.add_file_type("P1570515ATMSSCIENCEAXT11344000000000.PDS.cmr.xml",
"^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}00.PDS.cmr.xml$", 'internal', 'metadata', 'item') \
.add_file_type("P1570515ATMSSCIENCEAXT11344000000001.PDS.xml",
"^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}01\\.PDS\\.xml$", 'internal', 'metadata', 'item') \
.add_file_type("P1570515ATMSSCIENCEAXT11344000000000.PDS", "^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}00\\.PDS$",
'internal', 'data', 'item')
stac_collection = dapa_collection.start()

response = AwsLambda().invoke_function(
function_name='am-uds-dev-cumulus-cumulus_collections_creation_dapa',
payload=stac_collection,
)
self.assertTrue('ResponseMetadata' in response, f'missing ResponseMetadata in response {response}')
self.assertTrue('HTTPStatusCode' in response['ResponseMetadata'], f'missing HTTPStatusCode in response#ResponseMetadata {response}')
print(response)
return
25 changes: 25 additions & 0 deletions tf-module/unity-cumulus/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,31 @@ resource "aws_lambda_function" "cumulus_collections_creation_dapa" {
}
}

vpc_config {
subnet_ids = var.cumulus_lambda_subnet_ids
security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id]
}
tags = var.tags
}

resource "aws_lambda_function" "cumulus_collections_creation_dapa_facade" {
filename = local.lambda_file_name
function_name = "${var.prefix}-cumulus_collections_creation_dapa_facade"
role = var.lambda_processing_role_arn
handler = "cumulus_lambda_functions.cumulus_collections_dapa.lambda_function.lambda_handler_ingestion"
runtime = "python3.9"
timeout = 300

environment {
variables = {
LOG_LEVEL = var.log_level
CUMULUS_LAMBDA_PREFIX = var.prefix
CUMULUS_WORKFLOW_SQS_URL = var.workflow_sqs_url
CUMULUS_WORKFLOW_NAME = "CatalogGranule"
COLLECTION_CREATION_LAMBDA_NAME = aws_lambda_function.cumulus_collections_creation_dapa.arn
}
}

vpc_config {
subnet_ids = var.cumulus_lambda_subnet_ids
security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id]
Expand Down

0 comments on commit dc6cfe4

Please sign in to comment.