Skip to content

Commit 8af0ce1

Browse files
authored
BREAKING CHANGE : Auxiliary Files stage out (#415)
* fix: check by id first to see if the granule exists before updating it * breaking: stage out for anciliary file * feat: add checksum in stac * fix: granule id needs to be the full URN, not just the name * fix: adding checksum values * fix: need stac_extensions type update * fix: no need to add extra fields * chore: add sample docker file
1 parent ff7f4d0 commit 8af0ce1

File tree

14 files changed

+412
-56
lines changed

14 files changed

+412
-56
lines changed

ci.cd/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ build_lambda_public:
2020
upload_lambda:
2121
aws --profile saml-pub s3 cp cumulus_lambda_functions_deployment.zip s3://am-uds-dev-cumulus-tf-state/unity_cumulus_lambda/
2222

23+
move:
24+
mv /Users/wphyo/Downloads/cumulus_lambda_functions-*.zip tf-module/unity-cumulus/build/cumulus_lambda_functions_deployment.zip
25+
2326
upload_lambda_mcp_dev:
2427
aws s3 cp tf-module/unity-cumulus/build/cumulus_lambda_functions_deployment.zip s3://uds-dev-cumulus-public/unity_cumulus_lambda/
2528
update_lambda_function_mcp_dev_6:

cumulus_lambda_functions/cumulus_stac/item_transformer.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,12 +344,18 @@ def __get_asset_obj(self, input_dict):
344344
:param input_dict:
345345
:return:
346346
"""
347+
# https://github.com/stac-extensions/file
348+
# https://github.com/stac-extensions/file/blob/main/examples/item.json
347349
description_keys = ['size', 'checksumType', 'checksum']
348350
descriptions = [f'{k}={input_dict[k]};' for k in description_keys if k in input_dict]
349351
asset = Asset(
350352
href=f"s3://{input_dict['bucket']}/{input_dict['key']}",
351353
title=input_dict['fileName'],
352354
description=''.join(descriptions),
355+
extra_fields={
356+
'file:size': input_dict['size'] if 'size' in input_dict else -1,
357+
'file:checksum': input_dict['checksum'] if 'checksum' in input_dict else -1,
358+
},
353359
roles=[input_dict['type']]
354360
)
355361
return asset
@@ -472,6 +478,7 @@ def to_stac(self, source: dict) -> dict:
472478
}
473479
stac_item = Item(
474480
id=source['granuleId'],
481+
stac_extensions=["https://stac-extensions.github.io/file/v2.1.0/schema.json"],
475482
bbox=[-180.0, -90.0, 180.0, 90.0],
476483
properties={
477484
**custom_metadata,

cumulus_lambda_functions/granules_to_es/granules_index_mapping.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class GranulesIndexMapping:
4747
"title": {"type": "text"}
4848
}
4949
},
50-
"stac_extensions": {"type": "object"},
50+
"stac_extensions": {"type": "keyword"},
5151
"properties": {
5252
"dynamic": "false",
5353
"properties": {

cumulus_lambda_functions/lib/utils/file_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ def gunzip_file_os(zipped_file_path, output_file_path=None):
6767
return output_file_path
6868

6969
@staticmethod
70-
def get_checksum(file_path):
70+
def get_checksum(file_path, is_md5=False, chunk_size=25 * 2**20):
7171
with open(file_path, mode='rb') as f:
72-
d = hashlib.sha512()
73-
for buf in iter(partial(f.read, 512 * 2**10), b''):
72+
d = hashlib.md5() if is_md5 else hashlib.sha512()
73+
for buf in iter(partial(f.read, chunk_size), b''):
7474
d.update(buf)
7575
return d.hexdigest()
7676

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import json
2+
import logging
3+
import os.path
4+
from glob import glob
5+
from multiprocessing import Manager
6+
7+
from cumulus_lambda_functions.cumulus_stac.granules_catalog import GranulesCatalog
8+
9+
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
10+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_memory import JobManagerMemory
11+
from cumulus_lambda_functions.lib.processing_jobs.multithread_processor import MultiThreadProcessorProps, MultiThreadProcessor
12+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_abstract import JobManagerProps
13+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
14+
from cumulus_lambda_functions.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract
15+
from cumulus_lambda_functions.lib.time_utils import TimeUtils
16+
from cumulus_lambda_functions.stage_in_out.upload_granules_abstract import UploadGranulesAbstract
17+
from pystac import Item, Asset, ItemCollection, Catalog, Link
18+
19+
LOGGER = logging.getLogger(__name__)
20+
21+
22+
class UploadItemExecutor(JobExecutorAbstract):
23+
def __init__(self, result_list, error_list, collection_id, staging_bucket, retry_wait_time_sec, retry_times, delete_files: bool) -> None:
24+
super().__init__()
25+
self.__collection_id = collection_id
26+
self.__staging_bucket = staging_bucket
27+
self.__delete_files = delete_files
28+
29+
self.__gc = GranulesCatalog()
30+
self.__result_list = result_list
31+
self.__error_list = error_list
32+
# self.__gc = GranulesCatalog()
33+
self.__s3 = AwsS3()
34+
self.__retry_wait_time_sec = retry_wait_time_sec
35+
self.__retry_times = retry_times
36+
37+
def validate_job(self, job_obj):
38+
return True
39+
40+
def generate_sample_stac(self, filepath: str):
41+
filename = os.path.basename(filepath)
42+
file_checksum = FileUtils.get_checksum(filepath, True)
43+
# https://github.com/stac-extensions/file
44+
# https://github.com/stac-extensions/file/blob/main/examples/item.json
45+
sample_stac_item = Item(
46+
id=f'{self.__collection_id}:{os.path.splitext(filename)[0]}',
47+
stac_extensions=["https://stac-extensions.github.io/file/v2.1.0/schema.json"],
48+
geometry={
49+
"type": "Point",
50+
"coordinates": [0.0, 0.0]
51+
},
52+
bbox=[0.0, 0.0, 0.0, 0.0],
53+
datetime=TimeUtils().parse_from_unix(0, True).get_datetime_obj(),
54+
properties={
55+
"start_datetime": TimeUtils.get_current_time(),
56+
"end_datetime": TimeUtils.get_current_time(),
57+
"created": TimeUtils.get_current_time(),
58+
"updated": TimeUtils.get_current_time(),
59+
},
60+
collection=self.__collection_id,
61+
assets={
62+
filename: Asset(
63+
href=filepath,
64+
roles=['data'],
65+
title=os.path.basename(filename),
66+
extra_fields={
67+
'file:size': FileUtils.get_size(filepath),
68+
'file:checksum': file_checksum,
69+
},
70+
description=f'size={FileUtils.get_size(filepath)};checksumType=md5;checksum={file_checksum}'),
71+
f'{filename}.stac.json': Asset(href=f'{filepath}.stac.json', roles=['metadata'], description='desc=metadata stac;size=-1;checksumType=md5;checksum=unknown'), # How to update this? It's a circular dependency
72+
})
73+
74+
return sample_stac_item
75+
76+
def execute_job(self, job_obj, lock) -> bool:
77+
sample_stac_item = self.generate_sample_stac(job_obj)
78+
updating_assets = {}
79+
try:
80+
s3_url = self.__s3.upload(job_obj, self.__staging_bucket, f'{self.__collection_id}/{self.__collection_id}:{sample_stac_item.id}', self.__delete_files)
81+
updating_assets[os.path.basename(s3_url)] = s3_url
82+
uploading_current_granule_stac = f'{s3_url}.stac.json'
83+
self.__s3.set_s3_url(uploading_current_granule_stac)
84+
self.__s3.upload_bytes(json.dumps(sample_stac_item.to_dict(False, False),indent=4).encode())
85+
updating_assets[os.path.basename(uploading_current_granule_stac)] = uploading_current_granule_stac
86+
self.__gc.update_assets_href(sample_stac_item, updating_assets)
87+
self.__result_list.put(sample_stac_item.to_dict(False, False))
88+
except Exception as e:
89+
sample_stac_item.properties['upload_error'] = str(e)
90+
LOGGER.exception(f'error while processing: {job_obj}')
91+
self.__error_list.put(sample_stac_item.to_dict(False, False))
92+
return True
93+
94+
95+
class UploadArbitraryFilesAsGranules(UploadGranulesAbstract):
96+
BASE_DIRECTORY = 'BASE_DIRECTORY'
97+
98+
def __init__(self):
99+
super().__init__()
100+
self.__s3 = AwsS3()
101+
102+
def upload(self, **kwargs) -> str:
103+
104+
"""
105+
1. Use Glob to find files
106+
2. Create stac.json for each file.
107+
3. Need collection ID which has tenant + venue.
108+
4. Create successful features.json
109+
:param kwargs:
110+
:return:
111+
"""
112+
self._set_props_from_env()
113+
output_dir = os.environ.get(self.OUTPUT_DIRECTORY)
114+
if not FileUtils.dir_exist(output_dir):
115+
raise ValueError(f'OUTPUT_DIRECTORY: {output_dir} does not exist')
116+
missing_keys = [k for k in [self.BASE_DIRECTORY] if k not in os.environ]
117+
if len(missing_keys) > 0:
118+
raise ValueError(f'missing environment keys: {missing_keys}')
119+
base_dir = os.environ.get(self.BASE_DIRECTORY)
120+
possible_files = [k for k in glob(os.path.join(base_dir, '**'), recursive=True) if os.path.isfile(k)]
121+
122+
local_items = Manager().Queue()
123+
error_list = Manager().Queue()
124+
125+
if self._parallel_count == 1:
126+
for each_child in possible_files:
127+
temp_job = UploadItemExecutor(local_items, error_list, self._collection_id, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files)
128+
temp_job.execute_job(each_child, None)
129+
else:
130+
job_manager_props = JobManagerProps()
131+
for each_child in possible_files:
132+
job_manager_props.memory_job_dict[each_child] = each_child
133+
# https://www.infoworld.com/article/3542595/6-python-libraries-for-parallel-processing.html
134+
multithread_processor_props = MultiThreadProcessorProps(self._parallel_count)
135+
multithread_processor_props.job_manager = JobManagerMemory(job_manager_props)
136+
multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, self._collection_id, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files)
137+
multithread_processor = MultiThreadProcessor(multithread_processor_props)
138+
multithread_processor.start()
139+
140+
LOGGER.debug(f'finished uploading all granules')
141+
dapa_body_granules = []
142+
while not local_items.empty():
143+
dapa_body_granules.append(local_items.get())
144+
145+
errors = []
146+
while not error_list.empty():
147+
errors.append(error_list.get())
148+
LOGGER.debug(f'successful count: {len(dapa_body_granules)}. failed count: {len(errors)}')
149+
successful_item_collections = ItemCollection(items=dapa_body_granules)
150+
failed_item_collections = ItemCollection(items=errors)
151+
successful_features_file = os.path.join(output_dir, 'successful_features.json')
152+
153+
failed_features_file = os.path.join(output_dir, 'failed_features.json')
154+
LOGGER.debug(f'writing results: {successful_features_file} && {failed_features_file}')
155+
FileUtils.write_json(successful_features_file, successful_item_collections.to_dict(False))
156+
FileUtils.write_json(failed_features_file, failed_item_collections.to_dict(False))
157+
s3_url = self.__s3.upload(successful_features_file, self._staging_bucket,
158+
self._result_path_prefix,
159+
s3_name=f'successful_features_{TimeUtils.get_current_time()}.json',
160+
delete_files=self._delete_files)
161+
LOGGER.debug(f'uploaded successful features to S3: {s3_url}')
162+
LOGGER.debug(f'creating response catalog')
163+
catalog = Catalog(
164+
id='NA',
165+
description='NA')
166+
catalog.add_link(Link('item', successful_features_file, 'application/json'))
167+
catalog.add_link(Link('item', failed_features_file, 'application/json'))
168+
catalog_json = catalog.to_dict(False, False)
169+
LOGGER.debug(f'catalog_json: {catalog_json}')
170+
return json.dumps(catalog_json)
Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,44 @@
1+
import os
12
from abc import ABC, abstractmethod
23

4+
from cumulus_lambda_functions.lib.constants import Constants
5+
36

47
class UploadGranulesAbstract(ABC):
8+
RESULT_PATH_PREFIX = 'RESULT_PATH_PREFIX' # s3 prefix
9+
DEFAULT_RESULT_PATH_PREFIX = 'stage_out' # default s3 prefix
10+
OUTPUT_DIRECTORY = 'OUTPUT_DIRECTORY' # To store successful & failed features json
11+
COLLECTION_ID_KEY = 'COLLECTION_ID' # Need this
12+
STAGING_BUCKET_KEY = 'STAGING_BUCKET' # S3 Bucket
13+
VERIFY_SSL_KEY = 'VERIFY_SSL'
14+
DELETE_FILES_KEY = 'DELETE_FILES'
15+
16+
def __init__(self) -> None:
17+
super().__init__()
18+
self._collection_id = ''
19+
self._staging_bucket = ''
20+
self._result_path_prefix = ''
21+
self._parallel_count = int(os.environ.get(Constants.PARALLEL_COUNT, '-1'))
22+
self._retry_wait_time_sec = int(os.environ.get('UPLOAD_RETRY_WAIT_TIME', '30'))
23+
self._retry_times = int(os.environ.get('UPLOAD_RETRY_TIMES', '5'))
24+
self._verify_ssl = True
25+
self._delete_files = False
26+
27+
def _set_props_from_env(self):
28+
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
29+
if len(missing_keys) > 0:
30+
raise ValueError(f'missing environment keys: {missing_keys}')
31+
32+
self._collection_id = os.environ.get(self.COLLECTION_ID_KEY)
33+
self._staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
34+
self._result_path_prefix = os.environ.get(self.RESULT_PATH_PREFIX, self.DEFAULT_RESULT_PATH_PREFIX)
35+
self._result_path_prefix = self._result_path_prefix[:-1] if self._result_path_prefix.endswith('/') else self._result_path_prefix
36+
self._result_path_prefix = self._result_path_prefix[1:] if self._result_path_prefix.startswith('/') else self._result_path_prefix
37+
38+
self._verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
39+
self._delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
40+
return self
41+
542
@abstractmethod
6-
def upload(self, **kwargs) -> list:
43+
def upload(self, **kwargs) -> str:
744
raise NotImplementedError()

cumulus_lambda_functions/stage_in_out/upload_granules_by_complete_catalog_s3.py

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -89,48 +89,21 @@ def execute_job(self, each_child, lock) -> bool:
8989

9090

9191
class UploadGranulesByCompleteCatalogS3(UploadGranulesAbstract):
92-
RESULT_PATH_PREFIX = 'RESULT_PATH_PREFIX'
93-
DEFAULT_RESULT_PATH_PREFIX = 'stage_out'
9492
CATALOG_FILE = 'CATALOG_FILE'
95-
OUTPUT_DIRECTORY = 'OUTPUT_DIRECTORY'
96-
COLLECTION_ID_KEY = 'COLLECTION_ID'
97-
STAGING_BUCKET_KEY = 'STAGING_BUCKET'
98-
99-
VERIFY_SSL_KEY = 'VERIFY_SSL'
100-
DELETE_FILES_KEY = 'DELETE_FILES'
10193

10294
def __init__(self) -> None:
10395
super().__init__()
10496
self.__gc = GranulesCatalog()
105-
self.__collection_id = ''
106-
self.__staging_bucket = ''
107-
self.__result_path_prefix = ''
108-
self.__verify_ssl = True
109-
self.__delete_files = False
11097
self.__s3 = AwsS3()
111-
self._parallel_count = int(os.environ.get(Constants.PARALLEL_COUNT, '-1'))
112-
self.__retry_wait_time_sec = int(os.environ.get('UPLOAD_RETRY_WAIT_TIME', '30'))
113-
self.__retry_times = int(os.environ.get('UPLOAD_RETRY_TIMES', '5'))
114-
115-
def __set_props_from_env(self):
116-
missing_keys = [k for k in [self.CATALOG_FILE, self.COLLECTION_ID_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
117-
if len(missing_keys) > 0:
118-
raise ValueError(f'missing environment keys: {missing_keys}')
119-
120-
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
121-
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
122-
self.__result_path_prefix = os.environ.get(self.RESULT_PATH_PREFIX, self.DEFAULT_RESULT_PATH_PREFIX)
123-
self.__result_path_prefix = self.__result_path_prefix[:-1] if self.__result_path_prefix.endswith('/') else self.__result_path_prefix
124-
self.__result_path_prefix = self.__result_path_prefix[1:] if self.__result_path_prefix.startswith('/') else self.__result_path_prefix
125-
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
126-
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
127-
return self
12898

12999
def upload(self, **kwargs) -> str:
130-
self.__set_props_from_env()
100+
self._set_props_from_env()
131101
output_dir = os.environ.get(self.OUTPUT_DIRECTORY)
132102
if not FileUtils.dir_exist(output_dir):
133103
raise ValueError(f'OUTPUT_DIRECTORY: {output_dir} does not exist')
104+
missing_keys = [k for k in [self.CATALOG_FILE] if k not in os.environ]
105+
if len(missing_keys) > 0:
106+
raise ValueError(f'missing environment keys: {missing_keys}')
134107
catalog_file_path = os.environ.get(self.CATALOG_FILE)
135108
child_links = self.__gc.get_child_link_hrefs(catalog_file_path)
136109
local_items = Manager().Queue()
@@ -142,7 +115,7 @@ def upload(self, **kwargs) -> str:
142115
# https://www.infoworld.com/article/3542595/6-python-libraries-for-parallel-processing.html
143116
multithread_processor_props = MultiThreadProcessorProps(self._parallel_count)
144117
multithread_processor_props.job_manager = JobManagerMemory(job_manager_props)
145-
multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, self.__collection_id, self.__staging_bucket, self.__retry_wait_time_sec, self.__retry_times, self.__delete_files)
118+
multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, self._collection_id, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files)
146119
multithread_processor = MultiThreadProcessor(multithread_processor_props)
147120
multithread_processor.start()
148121

@@ -165,10 +138,10 @@ def upload(self, **kwargs) -> str:
165138
LOGGER.debug(f'writing results: {successful_features_file} && {failed_features_file}')
166139
FileUtils.write_json(successful_features_file, successful_item_collections.to_dict(False))
167140
FileUtils.write_json(failed_features_file, failed_item_collections.to_dict(False))
168-
s3_url = self.__s3.upload(successful_features_file, self.__staging_bucket,
169-
self.__result_path_prefix,
141+
s3_url = self.__s3.upload(successful_features_file, self._staging_bucket,
142+
self._result_path_prefix,
170143
s3_name=f'successful_features_{TimeUtils.get_current_time()}.json',
171-
delete_files=self.__delete_files)
144+
delete_files=self._delete_files)
172145
LOGGER.debug(f'uploaded successful features to S3: {s3_url}')
173146
LOGGER.debug(f'creating response catalog')
174147
catalog_json = GranulesCatalog().update_catalog(catalog_file_path, [successful_features_file, failed_features_file])
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
2+
13
class UploadGranulesFactory:
24
UPLOAD_S3_BY_STAC_CATALOG = 'UPLOAD_S3_BY_STAC_CATALOG'
5+
UPLOAD_AUXILIARY_FILE_AS_GRANULE = 'UPLOAD_AUXILIARY_FILE_AS_GRANULE'
36

47
def get_class(self, upload_type):
58
if upload_type == UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG:
69
from cumulus_lambda_functions.stage_in_out.upload_granules_by_complete_catalog_s3 import UploadGranulesByCompleteCatalogS3
710
return UploadGranulesByCompleteCatalogS3()
11+
if upload_type == UploadGranulesFactory.UPLOAD_AUXILIARY_FILE_AS_GRANULE:
12+
from cumulus_lambda_functions.stage_in_out.upload_arbitrary_files_as_granules import UploadArbitraryFilesAsGranules
13+
return UploadArbitraryFilesAsGranules()
814
raise ValueError(f'unknown search_type: {upload_type}')

0 commit comments

Comments
 (0)