Skip to content

Commit 84509f9

Browse files
authored
Merge d117486 into 88ef5fd
2 parents 88ef5fd + d117486 commit 84509f9

File tree

3 files changed

+38
-5
lines changed

3 files changed

+38
-5
lines changed

cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
4343
return None
4444
if len(cnm_response_keys) > 1:
4545
LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}')
46-
cnm_response_keys = cnm_response_keys[0]
46+
# assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
47+
cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one.
4748
LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}')
4849
local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp')
4950
cnm_response_json = FileUtils.read_json(local_file)
5051
FileUtils.remove_if_exists(local_file)
5152
return cnm_response_json
5253

54+
@staticmethod
55+
def revert_to_s3_url(input_url):
56+
if input_url.startswith("s3://"):
57+
return input_url
58+
if input_url.startswith("http://") or input_url.startswith("https://"):
59+
parts = input_url.split('/', 3)
60+
if len(parts) < 4:
61+
ValueError(f'invalid url: {input_url}')
62+
path_parts = parts[3].split('/', 1)
63+
if len(path_parts) != 2:
64+
ValueError(f'invalid url: {input_url}')
65+
bucket, key = path_parts
66+
return f"s3://{bucket}/{key}"
67+
raise ValueError(f'unknown schema: {input_url}')
68+
5369
def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
5470
granule_files = uds_cnm_json['product']['files']
5571
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
5672
return granule_files # TODO remove missing md5?
5773
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
5874
result_files = []
5975
for each_file in granule_files:
76+
LOGGER.debug(f'each_file: {each_file}')
6077
"""
6178
{
6279
"type": "data",
@@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
7188
if each_file['type'] not in archiving_types:
7289
continue
7390
file_extensions = archiving_types[each_file['type']]
91+
each_file['uri'] = self.revert_to_s3_url(each_file['uri'])
7492
if len(file_extensions) < 1:
7593
result_files.append(each_file) # TODO remove missing md5?
7694
temp_filename = each_file['name'].upper().strip()
@@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
7997
return result_files
8098

8199
def send_to_daac_internal(self, uds_cnm_json: dict):
100+
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
82101
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
83102
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
84103
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
85104
if daac_config is None or len(daac_config) < 1:
86105
LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}')
87106
return
88107
daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac.
108+
result = JsonValidator(UdsArchiveConfigIndex.basic_schema).validate(daac_config)
109+
if result is not None:
110+
raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}')
89111
try:
90112
self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn'])
91113
daac_cnm_message = {
92-
"collection": daac_config['daac_collection_name'],
114+
"collection": {
115+
'name': daac_config['daac_collection_name'],
116+
'version': daac_config['daac_data_version'],
117+
},
93118
"identifier": uds_cnm_json['identifier'],
94119
"submissionTime": f'{TimeUtils.get_current_time()}Z',
95120
"provider": granule_identifier.tenant,
96121
"version": "1.6.0", # TODO this is hardcoded?
97122
"product": {
98123
"name": granule_identifier.id,
99-
"dataVersion": daac_config['daac_data_version'],
124+
# "dataVersion": daac_config['daac_data_version'],
100125
'files': self.__extract_files(uds_cnm_json, daac_config),
101126
}
102127
}
103-
self.__sns.publish_message(json.dumps(daac_cnm_message))
128+
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
129+
self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True)
104130
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
105131
'archive_status': 'cnm_s_success',
106132
'archive_error_message': '',

cumulus_lambda_functions/lib/uds_db/archive_index.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ class UdsArchiveConfigIndex:
1515
basic_schema = {
1616
'type': 'object',
1717
"additionalProperties": False,
18-
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
18+
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name',
19+
'collection', 'ss_username', 'archiving_types'],
1920
'properties': {
2021
'daac_collection_id': {'type': 'string'},
2122
'daac_sns_topic_arn': {'type': 'string'},
2223
'daac_data_version': {'type': 'string'},
24+
'daac_role_arn': {'type': 'string'},
25+
'daac_role_session_name': {'type': 'string'},
2326
'collection': {'type': 'string'},
2427
'ss_username': {'type': 'string'},
2528
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},

cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ class DaacUpdateModel(BaseModel):
1919
daac_collection_id: str
2020
daac_data_version: Optional[str] = None
2121
daac_sns_topic_arn: Optional[str] = None
22+
daac_role_arn: Optional[str] = None
23+
daac_role_session_name: Optional[str] = None
2224
archiving_types: Optional[list[ArchivingTypesModel]] = None
2325

2426

2527
class DaacAddModel(BaseModel):
2628
daac_collection_id: str
2729
daac_data_version: str
2830
daac_sns_topic_arn: str
31+
daac_role_arn: str
32+
daac_role_session_name: str
2933
archiving_types: Optional[list[ArchivingTypesModel]] = []
3034

3135

0 commit comments

Comments
 (0)