@@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
43
43
return None
44
44
if len (cnm_response_keys ) > 1 :
45
45
LOGGER .warning (f'more than 1 cnm response file: { cnm_response_keys } ' )
46
- cnm_response_keys = cnm_response_keys [0 ]
46
+ # assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
47
+ cnm_response_keys = sorted (cnm_response_keys )[- 1 ] # sort and get the last one which is supposed to be the most recent one.
47
48
LOGGER .debug (f'cnm_response_keys: { cnm_response_keys } ' )
48
49
local_file = self .__s3 .set_s3_url (f's3://{ self .__s3 .target_bucket } /{ cnm_response_keys } ' ).download ('/tmp' )
49
50
cnm_response_json = FileUtils .read_json (local_file )
50
51
FileUtils .remove_if_exists (local_file )
51
52
return cnm_response_json
52
53
54
+ @staticmethod
55
+ def revert_to_s3_url (input_url ):
56
+ if input_url .startswith ("s3://" ):
57
+ return input_url
58
+ if input_url .startswith ("http://" ) or input_url .startswith ("https://" ):
59
+ parts = input_url .split ('/' , 3 )
60
+ if len (parts ) < 4 :
61
+ ValueError (f'invalid url: { input_url } ' )
62
+ path_parts = parts [3 ].split ('/' , 1 )
63
+ if len (path_parts ) != 2 :
64
+ ValueError (f'invalid url: { input_url } ' )
65
+ bucket , key = path_parts
66
+ return f"s3://{ bucket } /{ key } "
67
+ raise ValueError (f'unknown schema: { input_url } ' )
68
+
53
69
def __extract_files (self , uds_cnm_json : dict , daac_config : dict ):
54
70
granule_files = uds_cnm_json ['product' ]['files' ]
55
71
if 'archiving_types' not in daac_config or len (daac_config ['archiving_types' ]) < 1 :
56
72
return granule_files # TODO remove missing md5?
57
73
archiving_types = {k ['data_type' ]: [] if 'file_extension' not in k else k ['file_extension' ] for k in daac_config ['archiving_types' ]}
58
74
result_files = []
59
75
for each_file in granule_files :
76
+ LOGGER .debug (f'each_file: { each_file } ' )
60
77
"""
61
78
{
62
79
"type": "data",
@@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
71
88
if each_file ['type' ] not in archiving_types :
72
89
continue
73
90
file_extensions = archiving_types [each_file ['type' ]]
91
+ each_file ['uri' ] = self .revert_to_s3_url (each_file ['uri' ])
74
92
if len (file_extensions ) < 1 :
75
93
result_files .append (each_file ) # TODO remove missing md5?
76
94
temp_filename = each_file ['name' ].upper ().strip ()
@@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
79
97
return result_files
80
98
81
99
def send_to_daac_internal (self , uds_cnm_json : dict ):
100
+ LOGGER .debug (f'uds_cnm_json: { uds_cnm_json } ' )
82
101
granule_identifier = UdsCollections .decode_identifier (uds_cnm_json ['identifier' ]) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
83
102
self .__archive_index_logic .set_tenant_venue (granule_identifier .tenant , granule_identifier .venue )
84
103
daac_config = self .__archive_index_logic .percolate_document (uds_cnm_json ['identifier' ])
85
104
if daac_config is None or len (daac_config ) < 1 :
86
105
LOGGER .debug (f'uds_cnm_json is not configured for archival. uds_cnm_json: { uds_cnm_json } ' )
87
106
return
88
107
daac_config = daac_config [0 ] # TODO This is currently not supporting more than 1 daac.
108
+ result = JsonValidator (UdsArchiveConfigIndex .basic_schema ).validate (daac_config )
109
+ if result is not None :
110
+ raise ValueError (f'daac_config does not have valid schema. Pls re-add the daac config: { result } for { daac_config } ' )
89
111
try :
90
112
self .__sns .set_topic_arn (daac_config ['daac_sns_topic_arn' ])
91
113
daac_cnm_message = {
92
- "collection" : daac_config ['daac_collection_name' ],
114
+ "collection" : {
115
+ 'name' : daac_config ['daac_collection_name' ],
116
+ 'version' : daac_config ['daac_data_version' ],
117
+ },
93
118
"identifier" : uds_cnm_json ['identifier' ],
94
119
"submissionTime" : f'{ TimeUtils .get_current_time ()} Z' ,
95
120
"provider" : granule_identifier .tenant ,
96
121
"version" : "1.6.0" , # TODO this is hardcoded?
97
122
"product" : {
98
123
"name" : granule_identifier .id ,
99
- "dataVersion" : daac_config ['daac_data_version' ],
124
+ # "dataVersion": daac_config['daac_data_version'],
100
125
'files' : self .__extract_files (uds_cnm_json , daac_config ),
101
126
}
102
127
}
103
- self .__sns .publish_message (json .dumps (daac_cnm_message ))
128
+ LOGGER .debug (f'daac_cnm_message: { daac_cnm_message } ' )
129
+ self .__sns .set_external_role (daac_config ['daac_role_arn' ], daac_config ['daac_role_session_name' ]).publish_message (json .dumps (daac_cnm_message ), True )
104
130
self .__granules_index .update_entry (granule_identifier .tenant , granule_identifier .venue , {
105
131
'archive_status' : 'cnm_s_success' ,
106
132
'archive_error_message' : '' ,
0 commit comments