Skip to content

Commit

Permalink
Merge fc188a1 into f10f913
Browse files Browse the repository at this point in the history
  • Loading branch information
wphyojpl authored Sep 16, 2022
2 parents f10f913 + fc188a1 commit a64e0c0
Show file tree
Hide file tree
Showing 25 changed files with 806 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dockerbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: '3.7'
python-version: '3.9'
- run: |
# make file runnable, might not be necessary
chmod +x "${GITHUB_WORKSPACE}/ci.cd/store_version.sh"
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/makefile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Makefile CI

on:
push:
branches: [ main ]
# pull_request:
# branches: [ main ]
branches: [ main, develop ]
pull_request:
branches: [ develop ]

env:
ARTIFACT_BASE_NAME: cumulus_lambda_functions
Expand All @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: '3.7'
python-version: '3.9'
- run: |
python3 "${GITHUB_WORKSPACE}/setup.py" install
- run: |
Expand Down Expand Up @@ -52,7 +52,7 @@ jobs:
prerelease: false
- name: Create PreRelease
id: create_prerelease
if: ${{ contains(github.ref, 'main') }}
# if: ${{ contains(github.ref, 'main') }}
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # This token is provided by Actions, you do not need to create your own token
Expand All @@ -66,7 +66,7 @@ jobs:
prerelease: true
- name: Upload PreRelease Asset 1
id: upload-prerelease-asset-1
if: ${{ contains(github.ref, 'main') }}
# if: ${{ contains(github.ref, 'main') }}
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -77,7 +77,7 @@ jobs:
asset_content_type: application/zip
- name: Upload PreRelease Asset 2
id: upload-prerelease-asset-2
if: ${{ contains(github.ref, 'main') }}
# if: ${{ contains(github.ref, 'main') }}
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
4 changes: 4 additions & 0 deletions ci.cd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ update_lambda_function_2:
aws --profile saml-pub lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket am-uds-dev-cumulus-tf-state --function-name arn:aws:lambda:us-west-2:884500545225:function:am-uds-dev-cumulus-cumulus_granules_dapa --publish &>/dev/null
update_lambda_function_3:
aws --profile saml-pub lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket am-uds-dev-cumulus-tf-state --function-name arn:aws:lambda:us-west-2:884500545225:function:am-uds-dev-cumulus-cumulus_collections_ingest_cnm_dapa --publish &>/dev/null
update_lambda_function_4:
aws --profile saml-pub lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket am-uds-dev-cumulus-tf-state --function-name arn:aws:lambda:us-west-2:884500545225:function:am-uds-dev-cumulus-cumulus_collections_creation_dapa --publish &>/dev/null
update_lambda_function_5:
aws --profile saml-pub lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket am-uds-dev-cumulus-tf-state --function-name arn:aws:lambda:us-west-2:884500545225:function:am-uds-dev-cumulus-cumulus_collections_creation_dapa_facade --publish &>/dev/null
12 changes: 11 additions & 1 deletion ci.cd/create_aws_lambda_zip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ cp ${zip_file} build/
cd $project_root_dir/tf-module/unity-cumulus
zip -9 ${terraform_zip_file} * **/*

# github.job
github_branch=${GITHUB_REF##*/}
software_version_trailing=""
main_branch="main"
if [ "$github_branch" = "$main_branch" ];
then
software_version=""
else
software_version_trailing="-${github_branch}-${GITHUB_RUN_ID}"
fi
software_version=`python3 ${project_root_dir}/setup.py --version`
echo "software_version=${software_version}" >> ${GITHUB_ENV}
echo "software_version=${software_version}${software_version_trailing}" >> ${GITHUB_ENV}
cat ${GITHUB_ENV}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ def start(self):
if 'server_error' in cumulus_result:
return {
'statusCode': 500,
'body': {'message': cumulus_result['server_error']}
'body': json.dumps({'message': cumulus_result['server_error']})
}
if 'client_error' in cumulus_result:
return {
'statusCode': 400,
'body': {'message': cumulus_result['client_error']}
'body': json.dumps({'message': cumulus_result['client_error']})
}
cumulus_size = self.__get_size()
return {
Expand All @@ -84,5 +84,5 @@ def start(self):
LOGGER.exception(f'unexpected error')
return {
'statusCode': 500,
'body': {'message': f'unpredicted error: {str(e)}'}
'body': json.dumps({'message': f'unpredicted error: {str(e)}'})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import os

import pystac

from cumulus_lambda_functions.cumulus_stac.collection_transformer import CollectionTransformer
from cumulus_lambda_functions.cumulus_wrapper.query_collections import CollectionsQuery
from cumulus_lambda_functions.lib.aws.aws_lambda import AwsLambda
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class CumulusCreateCollectionDapa:
def __init__(self, event):
required_env = ['CUMULUS_LAMBDA_PREFIX', 'CUMULUS_WORKFLOW_SQS_URL']
if not all([k in os.environ for k in required_env]):
raise EnvironmentError(f'one or more missing env: {required_env}')
self.__event = event
self.__request_body = None
self.__cumulus_collection_query = CollectionsQuery('', '')
self.__cumulus_lambda_prefix = os.getenv('CUMULUS_LAMBDA_PREFIX')
self.__ingest_sqs_url = os.getenv('CUMULUS_WORKFLOW_SQS_URL')
self.__workflow_name = os.getenv('CUMULUS_WORKFLOW_NAME', 'CatalogGranule')
self.__provider_id = '' # TODO. need this?
self.__collection_creation_lambda_name = os.environ.get('COLLECTION_CREATION_LAMBDA_NAME', '').strip()

def execute_creation(self):
try:
cumulus_collection_doc = CollectionTransformer().from_stac(self.__request_body)
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
LOGGER.error(f'status not in creation_result: {creation_result}')
return {
'statusCode': 500,
'body': json.dumps({
'message': creation_result
})
}
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
if 'status' not in rule_creation_result:
# 'TODO' delete collection
LOGGER.error(f'status not in rule_creation_result: {rule_creation_result}')
return {
'statusCode': 500,
'body': json.dumps({
'message': {rule_creation_result},
})
}
except Exception as e:
LOGGER.exception('error while creating new collection in Cumulus')
return {
'statusCode': 500,
'body': json.dumps({
'message': f'error while creating new collection in Cumulus. check details',
'details': str(e)
})
}
LOGGER.info(f'creation_result: {creation_result}')
return {
'statusCode': 200,
'body': json.dumps({
'message': creation_result
})
}

def start(self):
if 'body' not in self.__event:
raise ValueError(f'missing body in {self.__event}')
self.__request_body = json.loads(self.__event['body'])
LOGGER.debug(f'request body: {self.__request_body}')
validation_result = pystac.Collection.from_dict(self.__request_body).validate()
if not isinstance(validation_result, list):
LOGGER.error(f'request body is not valid STAC collection: {validation_result}')
return {
'statusCode': 500,
'body': json.dumps({'message': f'request body is not valid STAC Collection schema. check details',
'details': validation_result})
}
if self.__collection_creation_lambda_name != '':
response = AwsLambda().invoke_function(
function_name=self.__collection_creation_lambda_name,
payload=self.__event,
)
LOGGER.debug(f'async function started: {response}')
return {
'statusCode': 202,
'body': json.dumps({
'message': 'processing'
})
}
LOGGER.debug(f'creating collection.')
return self.execute_creation()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from cumulus_lambda_functions.cumulus_collections_dapa.cumulus_collections_dapa import CumulusCollectionsDapa
from cumulus_lambda_functions.cumulus_collections_dapa.cumulus_create_collection_dapa import CumulusCreateCollectionDapa
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


Expand All @@ -12,3 +13,8 @@ def lambda_handler(event, context):
LambdaLoggerGenerator.remove_default_handlers()
# TODO implement
return CumulusCollectionsDapa(event).start()


def lambda_handler_ingestion(event, context):
LambdaLoggerGenerator.remove_default_handlers()
return CumulusCreateCollectionDapa(event).start()
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ def start(self):
if 'server_error' in cumulus_result:
return {
'statusCode': 500,
'body': {'message': cumulus_result['server_error']}
'body': json.dumps({'message': cumulus_result['server_error']})
}
if 'client_error' in cumulus_result:
return {
'statusCode': 400,
'body': {'message': cumulus_result['client_error']}
'body': json.dumps({'message': cumulus_result['client_error']})
}
cumulus_size = self.__get_size()
return {
Expand All @@ -133,5 +133,5 @@ def start(self):
LOGGER.exception(f'unexpected error')
return {
'statusCode': 500,
'body': {'message': f'unpredicted error: {str(e)}'}
'body': json.dumps({'message': f'unpredicted error: {str(e)}'})
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,5 @@ def start(self):
}
return {
'statusCode': 500,
'body': {'message': f'failed {len(error_list)}/{len(self.__request_body["features"])}', 'details': error_list}
'body': json.dumps({'message': f'failed {len(error_list)}/{len(self.__request_body["features"])}', 'details': error_list})
}
108 changes: 91 additions & 17 deletions cumulus_lambda_functions/cumulus_stac/collection_transformer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
from datetime import datetime
from urllib.parse import quote_plus
from urllib.parse import quote_plus, urlparse, unquote_plus

import pystac
from pystac import Link

from cumulus_lambda_functions.cumulus_stac.stac_transformer_abstract import StacTransformerAbstract
from cumulus_lambda_functions.lib.time_utils import TimeUtils

STAC_COLLECTION_SCHEMA = '''{
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -281,11 +284,21 @@


class CollectionTransformer(StacTransformerAbstract):
def __init__(self):
def __init__(self, report_to_ems:bool = True, include_date_range=False):
self.__stac_collection_schema = json.loads(STAC_COLLECTION_SCHEMA)
self.__cumulus_collection_schema = {}
self.__report_to_ems = report_to_ems
self.__include_date_range = include_date_range

def __convert_to_stac_links(self, collection_file_obj: dict):
def generate_target_link_url(self, regex: str = None, bucket: str = None):
href_link = ['unknown_bucket', 'unknown_regex']
if regex is not None and regex != '':
href_link[1] = regex
if bucket is not None and bucket != '':
href_link[0] = bucket
return f"./collection.json?bucket={href_link[0]}&regex={quote_plus(href_link[1])}"

def __convert_to_stac_links(self, collection_file_obj: dict, rel_type: str = 'item'):
"""
expected output
{
Expand All @@ -310,18 +323,16 @@ def __convert_to_stac_links(self, collection_file_obj: dict):
if collection_file_obj is None:
return {}
stac_link = {
'rel': 'item',
'rel': rel_type,
}
if 'type' in collection_file_obj:
stac_link['type'] = collection_file_obj['type']
if 'sampleFileName' in collection_file_obj:
stac_link['title'] = collection_file_obj['sampleFileName']
href_link = ['unknown_bucket', 'unknown_regex']
if 'bucket' in collection_file_obj:
href_link[0] = collection_file_obj['bucket']
if 'regex' in collection_file_obj:
href_link[1] = collection_file_obj['regex']
stac_link['href'] = f"./collection.json?bucket={href_link[0]}&regex={quote_plus(href_link[1])}"
stac_link['href'] = self.generate_target_link_url(
collection_file_obj['regex'] if 'regex' in collection_file_obj else None,
collection_file_obj['bucket'] if 'bucket' in collection_file_obj else None,
)
return stac_link

# def to_pystac_link_obj(self, input_dict: dict):
Expand Down Expand Up @@ -418,14 +429,77 @@ def to_stac(self, source: dict) -> dict:
"process": [source['process'] if 'process' in source else ''],
"totalGranules": [source['total_size'] if 'total_size' in source else -1],
},
"links": [{
"rel": "root",
"type": "application/json",
"title": f"{source['name']}___{source['version']}",
"href": "./collection.json"
}] + [self.__convert_to_stac_links(k) for k in source['files']],
"links": [self.__convert_to_stac_links({
"regex": source['url_path'] if 'url_path' in source else './collection.json',
"sampleFileName": source['sampleFileName'],
"type": "application/json",

}, 'root')] + [self.__convert_to_stac_links(k) for k in source['files']],
}
return stac_collection

def get_href(self, input_href: str):
parse_result = urlparse(input_href)
if parse_result.query == '':
return ''
query_dict = [k.split('=') for k in parse_result.query.split('&')]
query_dict = {k[0]: unquote_plus(k[1]) for k in query_dict}
return query_dict

def __convert_from_stac_links(self, link_obj: dict):
output_file_object = {
'reportToEms': self.__report_to_ems
}
if 'type' in link_obj:
output_file_object['type'] = link_obj['type']
if 'title' in link_obj:
output_file_object['sampleFileName'] = link_obj['title']
if 'href' in link_obj:
href_dict = self.get_href(link_obj['href'])
if 'bucket' in href_dict:
output_file_object['bucket'] = href_dict['bucket']
if 'regex' in href_dict:
output_file_object['regex'] = href_dict['regex']
return output_file_object

def from_stac(self, source: dict) -> dict:
return {}
input_dapa_collection = pystac.Collection.from_dict(source)
if not input_dapa_collection.validate():
raise ValueError(f'invalid source dapa: {input_dapa_collection}')
output_collection_cumulus = {
# "createdAt": 1647992847582,
"reportToEms": self.__report_to_ems,
"duplicateHandling": "skip",
# "updatedAt": 1647992847582,
# "timestamp": 1647992849273
}
summaries = input_dapa_collection.summaries.lists
if 'granuleId' in summaries:
output_collection_cumulus['granuleId'] = summaries['granuleId'][0]
if 'granuleIdExtraction' in summaries:
output_collection_cumulus['granuleIdExtraction'] = summaries['granuleIdExtraction'][0]
if 'process' in summaries:
output_collection_cumulus['process'] = summaries['process'][0]
name_version = input_dapa_collection.id.split('___')
output_collection_cumulus['name'] = name_version[0]
output_collection_cumulus['version'] = name_version[1]
output_files = []
for each_link_obj in input_dapa_collection.links:
each_link_obj: Link = each_link_obj
each_file_obj = self.__convert_from_stac_links(each_link_obj.to_dict())
if each_link_obj.rel == 'root':
if 'regex' in each_file_obj:
output_collection_cumulus['url_path'] = each_file_obj['regex']
if 'sampleFileName' in each_file_obj:
output_collection_cumulus['sampleFileName'] = each_file_obj['sampleFileName']
else:
output_files.append(each_file_obj)
output_collection_cumulus['files'] = output_files
if len(input_dapa_collection.extent.temporal.intervals) > 0:
date_interval = input_dapa_collection.extent.temporal.intervals[0]
if len(date_interval) == 2 and self.__include_date_range is True:
if date_interval[0] is not None:
output_collection_cumulus['dateFrom'] = date_interval[0].strftime(TimeUtils.MMDD_FORMAT)
if date_interval[1] is not None:
output_collection_cumulus['dateTo'] = date_interval[1].strftime(TimeUtils.MMDD_FORMAT)
return output_collection_cumulus
Loading

0 comments on commit a64e0c0

Please sign in to comment.