Skip to content

Commit

Permalink
fix: use thread to run the actual collection creation in background t…
Browse files Browse the repository at this point in the history
…hread
  • Loading branch information
wphyojpl committed Sep 13, 2022
1 parent f415e43 commit 4b4b3ef
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from threading import Thread

import pystac

Expand All @@ -10,6 +11,45 @@
LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class CollectionCreationThread(Thread):
def __init__(self, request_body):
super().__init__()
# self.thread_name = thread_name
# self.thread_ID = thread_ID
self.__request_body = request_body
self.__cumulus_collection_query = CollectionsQuery('', '')
self.__cumulus_lambda_prefix = os.getenv('CUMULUS_LAMBDA_PREFIX')
self.__ingest_sqs_url = os.getenv('CUMULUS_WORKFLOW_SQS_URL')
self.__workflow_name = os.getenv('CUMULUS_WORKFLOW_NAME', 'CatalogGranule')
self.__provider_id = '' # TODO. need this?
# helper function to execute the threads

def run(self):
try:
cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
LOGGER.error(f'status not in creation_result: {creation_result}')
return

rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
if 'status' not in rule_creation_result:
# 'TODO' delete collection
LOGGER.error(f'status not in rule_creation_result: {rule_creation_result}')
return
except Exception as e:
LOGGER.exception('error while creating new collection in Cumulus')
return
LOGGER.info(f'collection and rule created for: {self.__request_body}')
return


class CumulusCreateCollectionDapa:
def __init__(self, event):
required_env = ['CUMULUS_LAMBDA_PREFIX', 'CUMULUS_WORKFLOW_SQS_URL']
Expand All @@ -36,43 +76,49 @@ def start(self):
'body': {'message': f'request body is not valid STAC Collection schema. check details',
'details': validation_result}
}
try:
cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
return {
'statusCode': 500,
'body': {
'message': {creation_result}
}
}
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
if 'status' not in rule_creation_result:
# 'TODO' delete collection
return {
'statusCode': 500,
'body': {
'message': {rule_creation_result},
}
}
except Exception as e:
LOGGER.exception('error while creating new collection in Cumulus')
return {
'statusCode': 500,
'body': {
'message': f'error while creating new collection in Cumulus. check details',
'details': str(e)
}
}
thread1 = CollectionCreationThread(self.__request_body)
thread1.start()
return {
'statusCode': 200,
'body': {
'message': creation_result
}
'statusCode': 202,
'body': {'message': 'started in backgorund'}
}
# try:
# cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
# creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
# if 'status' not in creation_result:
# return {
# 'statusCode': 500,
# 'body': {
# 'message': {creation_result}
# }
# }
# rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
# cumulus_collection_doc,
# self.__cumulus_lambda_prefix,
# self.__ingest_sqs_url,
# self.__provider_id,
# self.__workflow_name,
# )
# if 'status' not in rule_creation_result:
# # 'TODO' delete collection
# return {
# 'statusCode': 500,
# 'body': {
# 'message': {rule_creation_result},
# }
# }
# except Exception as e:
# LOGGER.exception('error while creating new collection in Cumulus')
# return {
# 'statusCode': 500,
# 'body': {
# 'message': f'error while creating new collection in Cumulus. check details',
# 'details': str(e)
# }
# }
# return {
# 'statusCode': 200,
# 'body': {
# 'message': creation_result
# }
# }
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ def test_01(self):
.add_file_type("P1570515ATMSSCIENCEAXT11344000000000.PDS", "^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}00\\.PDS$",
'internal', 'data', 'item')
os.environ['CUMULUS_LAMBDA_PREFIX'] = 'am-uds-dev-cumulus'
os.environ['CUMULUS_WORKFLOW_SQS_URL'] = 'https://sqs.us-west-2.amazonaws.com/884500545225/am-uds-dev-cumulus-cnm-submission-queue'
stac_collection = dapa_collection.start()
event = {
'body': json.dumps(stac_collection)
}
creation = CumulusCreateCollectionDapa(event).start()
self.assertTrue('statusCode' in creation, f'missing statusCode: {creation}')
self.assertEqual(200, creation['statusCode'], f'wrong statusCode: {creation}')
self.assertEqual(202, creation['statusCode'], f'wrong statusCode: {creation}')
return

def test_02(self):
Expand Down Expand Up @@ -68,5 +69,5 @@ def test_02(self):
headers=headers,
json=stac_collection,
)
self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}')
self.assertEqual(query_result.status_code, 202, f'wrong status code. {query_result.text}')
return
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from datetime import datetime
from unittest import TestCase

from cumulus_lambda_functions.cumulus_wrapper.query_collections import CollectionsQuery
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


class TestQueryCollection(TestCase):
Expand Down Expand Up @@ -72,6 +74,10 @@ def test_02(self):
return

def test_rules_03(self):
LambdaLoggerGenerator.remove_default_handlers()
# logging.basicConfig(level=20,
# format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")

lambda_prefix = 'am-uds-dev-cumulus'
collection_query = CollectionsQuery('NA', 'NA')
collection_version = int(datetime.utcnow().timestamp())
Expand Down Expand Up @@ -143,5 +149,5 @@ def test_rules_04(self):
collection_query = CollectionsQuery('NA', 'NA')
collection_query.with_limit(200)
rules = collection_query.query_rules(lambda_prefix)
self.assertTrue(False)
self.assertTrue(True)
return

0 comments on commit 4b4b3ef

Please sign in to comment.