From 84308000e7d520794dc3951f5ef2adce3ea366b7 Mon Sep 17 00:00:00 2001 From: wphyojpl <38299756+wphyojpl@users.noreply.github.com> Date: Thu, 29 May 2025 12:47:12 -0700 Subject: [PATCH 1/9] feat: daac product.name = granule id (#589) --- .../daac_archiver/daac_archiver_logic.py | 4 ++-- .../lib/uds_db/uds_collections.py | 9 +++++++++ .../lib/uds_db/test_uds_collections.py | 13 +++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index c0df7e2a..5e47771d 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -98,7 +98,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): def send_to_daac_internal(self, uds_cnm_json: dict): LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}') - granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this. + granule_identifier = UdsCollections.decode_granule_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this. self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue) daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier']) if daac_config is None or len(daac_config) < 1: @@ -120,7 +120,7 @@ def send_to_daac_internal(self, uds_cnm_json: dict): "provider": granule_identifier.tenant, "version": "1.6.0", # TODO this is hardcoded? "product": { - "name": granule_identifier.id, + "name": granule_identifier.granule, # "dataVersion": daac_config['daac_data_version'], 'files': self.__extract_files(uds_cnm_json, daac_config), } diff --git a/cumulus_lambda_functions/lib/uds_db/uds_collections.py b/cumulus_lambda_functions/lib/uds_db/uds_collections.py index a525f7c9..fb3e0dc2 100644 --- a/cumulus_lambda_functions/lib/uds_db/uds_collections.py +++ b/cumulus_lambda_functions/lib/uds_db/uds_collections.py @@ -12,6 +12,7 @@ CollectionIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id']) +GranuleIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id', 'granule']) class UdsCollections: @@ -35,6 +36,14 @@ def decode_identifier(incoming_identifier: str) -> CollectionIdentifier: raise ValueError(f'invalid collection: {collection_identifier_parts}') return CollectionIdentifier._make(collection_identifier_parts[0:6]) + @staticmethod + def decode_granule_identifier(incoming_identifier: str) -> GranuleIdentifier: + collection_identifier_parts = incoming_identifier.split(':') + if len(collection_identifier_parts) < 7: + raise ValueError(f'invalid collection: {collection_identifier_parts}') + return GranuleIdentifier._make(collection_identifier_parts[0:6] + [':'.join(collection_identifier_parts[6:])]) + + def __bbox_to_polygon(self, bbox: list): if len(bbox) != 4: raise ValueError(f'not bounding box: {bbox}') diff --git a/tests/cumulus_lambda_functions/lib/uds_db/test_uds_collections.py b/tests/cumulus_lambda_functions/lib/uds_db/test_uds_collections.py index 9efe82a5..eabbee1d 100644 --- a/tests/cumulus_lambda_functions/lib/uds_db/test_uds_collections.py +++ b/tests/cumulus_lambda_functions/lib/uds_db/test_uds_collections.py @@ -26,5 +26,18 @@ def test_02(self): self.assertEqual(aa.venue, 'DEV', f'wrong venue') self.assertEqual(aa.tenant, 'UDS_LOCAL', f'wrong tenant') print(aa) + + granule_id = 'URN:NASA:UNITY:unity:test:TRPSDL2ALLCRS1MGLOS___2:TROPESS_CrIS-JPSS1_L2_Standard_H2O_20250108_MUSES_R1p23_megacity_los_angeles_MGLOS_F2p5_J0' + aa = UdsCollections.decode_granule_identifier(granule_id) + self.assertEqual(aa.venue, 'test', f'wrong venue') + self.assertEqual(aa.tenant, 'unity', f'wrong tenant') + self.assertEqual(aa.granule, 'TROPESS_CrIS-JPSS1_L2_Standard_H2O_20250108_MUSES_R1p23_megacity_los_angeles_MGLOS_F2p5_J0') + print(aa) + granule_id = 'URN:NASA:UNITY:unity:test:TRPSDL2ALLCRS1MGLOS___2:TROPESS_CrIS-JPSS1_L2_Standard_H2O_20250108_MUSES_R1p23_megacity_los_angeles_MGLOS_F2p5_J0:1:2:3:4' + aa = UdsCollections.decode_granule_identifier(granule_id) + self.assertEqual(aa.venue, 'test', f'wrong venue') + self.assertEqual(aa.tenant, 'unity', f'wrong tenant') + self.assertEqual(aa.granule, 'TROPESS_CrIS-JPSS1_L2_Standard_H2O_20250108_MUSES_R1p23_megacity_los_angeles_MGLOS_F2p5_J0:1:2:3:4') + return From fce033b27dd24c1133927437e441e04cd952536d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 29 May 2025 16:50:59 -0700 Subject: [PATCH 2/9] chore: update version + change log (#590) Co-authored-by: ngachung --- CHANGELOG.md | 4 ++++ setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e71a9060..e08a0cf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.13.0] - 2025-05-29 +### Changed +- [#589](https://github.com/unity-sds/unity-data-services/pull/589) feat: daac product.name = granule id + ## [9.12.0] - 2025-05-24 ### Changed - [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas diff --git a/setup.py b/setup.py index 5e37f08d..16c5dc01 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name="cumulus_lambda_functions", - version="9.12.0", + version="9.13.0", packages=find_packages(), install_requires=install_requires, package_data={ From 715d3276c71ce4e4491407c5840b0cabec923655 Mon Sep 17 00:00:00 2001 From: wphyojpl <38299756+wphyojpl@users.noreply.github.com> Date: Mon, 2 Jun 2025 13:25:52 -0700 Subject: [PATCH 3/9] feat: add daac_provider (#593) * feat: add daac_provider * fix: make daac_provider optional * fix; fallback to tenant if provider is missing --- cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py | 2 +- .../granules_to_es/granules_index_mapping.py | 3 +++ cumulus_lambda_functions/lib/uds_db/archive_index.py | 2 ++ cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py | 4 +++- 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index 5e47771d..fbb910d7 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -117,7 +117,7 @@ def send_to_daac_internal(self, uds_cnm_json: dict): }, "identifier": uds_cnm_json['identifier'], "submissionTime": f'{TimeUtils.get_current_time()}Z', - "provider": granule_identifier.tenant, + "provider": daac_config['daac_provider'] if 'daac_provider' in daac_config else granule_identifier.tenant, "version": "1.6.0", # TODO this is hardcoded? "product": { "name": granule_identifier.granule, diff --git a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py index 8b5ece1c..2fdc5021 100644 --- a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py +++ b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py @@ -9,6 +9,9 @@ class GranulesIndexMapping: "daac_data_version": { "type": "keyword" }, + "daac_provider": { + "type": "keyword" + }, "daac_role_arn": { "type": "keyword" }, diff --git a/cumulus_lambda_functions/lib/uds_db/archive_index.py b/cumulus_lambda_functions/lib/uds_db/archive_index.py index a25a9d31..fcf74f9d 100644 --- a/cumulus_lambda_functions/lib/uds_db/archive_index.py +++ b/cumulus_lambda_functions/lib/uds_db/archive_index.py @@ -19,6 +19,7 @@ class UdsArchiveConfigIndex: 'collection', 'ss_username', 'archiving_types'], 'properties': { 'daac_collection_id': {'type': 'string'}, + 'daac_provider': {'type': 'string'}, 'daac_sns_topic_arn': {'type': 'string'}, 'daac_data_version': {'type': 'string'}, 'daac_role_arn': {'type': 'string'}, @@ -36,6 +37,7 @@ class UdsArchiveConfigIndex: 'collection', 'ss_username', 'archiving_types'], 'properties': { 'daac_collection_name': {'type': 'string'}, + 'daac_provider': {'type': 'string'}, 'daac_sns_topic_arn': {'type': 'string'}, 'daac_data_version': {'type': 'string'}, 'daac_role_arn': {'type': 'string'}, diff --git a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py index 347e2e98..b5f54412 100644 --- a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py +++ b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py @@ -17,6 +17,7 @@ class ArchivingTypesModel(BaseModel): class DaacUpdateModel(BaseModel): daac_collection_id: str + daac_provider: Optional[str] = None daac_data_version: Optional[str] = None daac_sns_topic_arn: Optional[str] = None daac_role_arn: Optional[str] = None @@ -26,6 +27,7 @@ class DaacUpdateModel(BaseModel): class DaacAddModel(BaseModel): daac_collection_id: str + daac_provider: Optional[str] = None daac_data_version: str daac_sns_topic_arn: str daac_role_arn: str @@ -104,7 +106,7 @@ def add_new_config(self): } ingesting_dict = { - **self.__request_body, + **{k: v for k, v in self.__request_body.items() if v is not None}, 'ss_username': self.__authorization_info['username'], 'collection': self.__collection_id, } From 737a512afeb49ba6ddf8c17379a6c8d47c8dcc47 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 2 Jun 2025 13:26:18 -0700 Subject: [PATCH 4/9] chore: update version + change log (#594) Co-authored-by: wphyojpl --- CHANGELOG.md | 4 ++++ setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e08a0cf6..d4632675 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.14.0] - 2025-06-02 +### Changed +- [#593](https://github.com/unity-sds/unity-data-services/pull/593) feat: add daac_provider + ## [9.13.0] - 2025-05-29 ### Changed - [#589](https://github.com/unity-sds/unity-data-services/pull/589) feat: daac product.name = granule id diff --git a/setup.py b/setup.py index 16c5dc01..f408d464 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name="cumulus_lambda_functions", - version="9.13.0", + version="9.14.0", packages=find_packages(), install_requires=install_requires, package_data={ From 4600d27519924e79d8208513dff88fb3f3f885e5 Mon Sep 17 00:00:00 2001 From: wphyojpl <38299756+wphyojpl@users.noreply.github.com> Date: Tue, 3 Jun 2025 09:49:49 -0700 Subject: [PATCH 5/9] feat: Empty collection delete (#591) * feat: collection deletion * fix: update collection to pass existing code validation * fix: expecting json dict --- .../cumulus_wrapper/query_collections.py | 33 ++++++++ .../lib/uds_db/granules_db_index.py | 11 +++ .../uds_api/collections_api.py | 70 +++++++++++++++- .../uds_api/dapa/collections_dapa_creation.py | 79 +++++++++++++++++++ 4 files changed, 192 insertions(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 449d487c..43823690 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -158,6 +158,39 @@ def query_rules(self, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'results': query_result} + def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores + rule_name = f'{underscore_collection_name}___{new_collection["version"]}___rules_sqs' + payload = { + 'httpMethod': 'DELETE', + 'resource': '/{proxy+}', + 'path': f'/{self.__rules_key}/{rule_name}', + 'headers': { + 'Content-Type': 'application/json', + }, + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'message' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result['message']} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py index 2fae3f9d..103c8d3b 100644 --- a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py +++ b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py @@ -305,6 +305,17 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str # TODO validate custom metadata vs the latest index to filter extra items return + def get_size(self, tenant: str, tenant_venue: str, collection_id: str): + read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip() + search_dsl = { + 'query': {'bool': {'must': [{ + 'term': {'collection': collection_id} + }]}}, + 'size': 0 + } + search_result = self.__es.query(search_dsl, querying_index=read_alias_name) + return self.__es.get_result_size(search_result) + def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict): read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip() if 'sort' not in search_dsl: # We cannot paginate w/o sort. So, max is 10k items: diff --git a/cumulus_lambda_functions/uds_api/collections_api.py b/cumulus_lambda_functions/uds_api/collections_api.py index 3551a7b5..6a7c2d2e 100644 --- a/cumulus_lambda_functions/uds_api/collections_api.py +++ b/cumulus_lambda_functions/uds_api/collections_api.py @@ -1,10 +1,12 @@ import json import os +from datetime import datetime from typing import Union -from pystac import Catalog, Link +from pystac import Catalog, Link, Collection, Extent, SpatialExtent, TemporalExtent, Summaries, Provider from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants +from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections @@ -276,3 +278,69 @@ async def query_collections(request: Request, collection_id: Union[str, None] = if collections_result['statusCode'] == 200: return collections_result['body'] raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body']) + +@router.delete("/{collection_id}") +@router.delete("/{collection_id}/") +async def delete_single_collection(request: Request, collection_id: str): + LOGGER.debug(f'starting delete_single_collection: {collection_id}') + LOGGER.debug(f'starting delete_single_collection request: {request}') + + authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \ + .get_instance(UDSAuthorizerFactory.cognito, + es_url=os.getenv('ES_URL'), + es_port=int(os.getenv('ES_PORT', '443')) + ) + auth_info = FastApiUtils.get_authorization_info(request) + uds_collections = UdsCollections(es_url=os.getenv('ES_URL'), + es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS')) + if collection_id is None or collection_id == '': + raise HTTPException(status_code=500, detail=f'missing or invalid collection_id: {collection_id}') + collection_identifier = uds_collections.decode_identifier(collection_id) + if not authorizer.is_authorized_for_collection(DBConstants.delete, collection_id, + auth_info['ldap_groups'], + collection_identifier.tenant, + collection_identifier.venue): + LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}') + raise HTTPException(status_code=403, detail=json.dumps({ + 'message': 'not authorized to execute this action' + })) + + granules_count = GranulesDbIndex().get_size(collection_identifier.tenant, collection_identifier.venue, + collection_id) + LOGGER.debug(f'granules_count: {granules_count} for {collection_id}') + if granules_count > 0: + LOGGER.debug(f'NOT deleting {collection_id} as it is not empty') + raise HTTPException(status_code=409, detail=f'NOT deleting {collection_id} as it is not empty') + + try: + new_collection = Collection( + id=collection_id, + title=collection_id, + description='TODO', + extent = Extent( + SpatialExtent([[0.0, 0.0, 0.0, 0.0]]), + TemporalExtent([[datetime.utcnow(), datetime.utcnow()]]) + ), + license = "proprietary", + providers = [], + # title=input_collection['LongName'], + # keywords=[input_collection['SpatialKeywords']['Keyword']], + summaries = Summaries({ + "totalGranules": [-1], + }), + ) + new_collection.links = [ + Link(rel='root', + target=f'./collection.json', + media_type='application/json', title=f"{new_collection.id}"), + Link(rel='item', + target='./collection.json', + media_type='application/json', title=f"{new_collection.id} Granules") + ] + creation_result = CollectionDapaCreation(new_collection.to_dict(False, False)).delete() + except Exception as e: + LOGGER.exception('failed during ingest_cnm_dapa') + raise HTTPException(status_code=500, detail=str(e)) + if creation_result['statusCode'] < 300: + return creation_result['body'], creation_result['statusCode'] + raise HTTPException(status_code=creation_result['statusCode'], detail=creation_result['body']) diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 27fe645d..42a8611a 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -81,6 +81,18 @@ def __init__(self, request_body): self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True) self.__cumulus_collection_query = CollectionsQuery('', '') + def __delete_collection_cumulus(self, cumulus_collection_doc): + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + if 'status' not in delete_result: + LOGGER.error(f'status not in creation_result: {delete_result}') + return { + 'statusCode': 500, + 'body': { + 'message': delete_result + } + }, None + return None, delete_result + def __create_collection_cumulus(self, cumulus_collection_doc): creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) if 'status' not in creation_result: @@ -116,6 +128,37 @@ def __create_rules_cumulus(self, cumulus_collection_doc): } return None + def __delete_rules_cumulus(self, cumulus_collection_doc): + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( + cumulus_collection_doc, + self.__cumulus_lambda_prefix + ) + if 'status' not in rule_deletion_result: + LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') + return { + 'statusCode': 500, + 'body': { + 'message': rule_deletion_result, + 'details': f'collection deletion result: {rule_deletion_result}' + } + } + return None + + def __delete_collection_uds(self): + try: + delete_collection_result = self.__uds_collection.delete_collection( + collection_id=self.__collection_transformer.get_collection_id() + ) + except Exception as e: + LOGGER.exception(f'failed to add collection to Elasticsearch') + return { + 'statusCode': 500, + 'body': { + 'message': f'unable to delete collection to Elasticsearch: {str(e)}', + } + } + return None + def __create_collection_uds(self, cumulus_collection_doc): try: @@ -143,6 +186,42 @@ def __create_collection_uds(self, cumulus_collection_doc): } return None + def delete(self): + deletion_result = {} + try: + cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) + self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider + LOGGER.debug(f'__provider_id: {self.__provider_id}') + creation_result = 'NA' + + if self.__include_cumulus: + rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc) + deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded' + delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc) + deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result + else: + deletion_result['cumulus_rule_deletion'] = 'NA' + deletion_result['cumulus_collection_deletion'] = 'NA' + + uds_deletion_result = self.__delete_collection_uds() + deletion_result['uds_collection_deletion'] = uds_deletion_result if uds_deletion_result is not None else 'succeeded' + except Exception as e: + LOGGER.exception('error while creating new collection in Cumulus') + return { + 'statusCode': 500, + 'body': { + 'message': f'error while creating new collection in Cumulus. check details', + 'details': str(e) + } + } + LOGGER.info(f'creation_result: {creation_result}') + return { + 'statusCode': 200, + 'body': { + 'message': deletion_result + } + } + def create(self): try: cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) From 3afc9ce20c7c038d7b70dfd4dbfd7a1b2c59577f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 3 Jun 2025 09:50:32 -0700 Subject: [PATCH 6/9] chore: update version + change log (#595) Co-authored-by: wphyojpl --- CHANGELOG.md | 4 ++++ setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4632675..d502b5db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.15.0] - 2025-06-03 +### Changed +- [#591](https://github.com/unity-sds/unity-data-services/pull/591) feat: empty collection delete + ## [9.14.0] - 2025-06-02 ### Changed - [#593](https://github.com/unity-sds/unity-data-services/pull/593) feat: add daac_provider diff --git a/setup.py b/setup.py index f408d464..9c20ee35 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name="cumulus_lambda_functions", - version="9.14.0", + version="9.15.0", packages=find_packages(), install_requires=install_requires, package_data={ From 93651123c8942925b70d305e42db3405a0402610 Mon Sep 17 00:00:00 2001 From: Anil Natha Date: Thu, 5 Jun 2025 16:24:34 -0700 Subject: [PATCH 7/9] Updated health endpoint metadata for Data Catalog (#596) --- tf-module/unity-cumulus/main.tf | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tf-module/unity-cumulus/main.tf b/tf-module/unity-cumulus/main.tf index a673ece0..717f1433 100644 --- a/tf-module/unity-cumulus/main.tf +++ b/tf-module/unity-cumulus/main.tf @@ -185,9 +185,12 @@ resource "aws_ssm_parameter" "health_check_value" { type = "String" tier = "Advanced" value = jsonencode({ - healthCheckUrl = "${var.uds_base_url}/${var.dapa_api_prefix}/collections", - landingPageUrl = "${var.unity_ui_base_url}/data/stac_browser/", - componentName = "Data Catalog", + healthCheckUrl = "${var.uds_base_url}/${var.dapa_api_prefix}/collections", + landingPageUrl = "${var.unity_ui_base_url}/data/stac_browser/", + componentCategory = "catalogs" + componentName = "Data Catalog", + componentType = "ui" + description = "The STAC Browser to help you browse and search the data catalog for your outputs and other data ingested into the MDPS data system." }) tags = var.tags overwrite = true @@ -201,4 +204,4 @@ resource "aws_ssm_parameter" "marketplace_prefix" { tier = "Advanced" tags = var.tags overwrite = true -} \ No newline at end of file +} From 7ae5ffb92ae388842ff9a0c94914d934d0d8602d Mon Sep 17 00:00:00 2001 From: wphyojpl <38299756+wphyojpl@users.noreply.github.com> Date: Wed, 25 Jun 2025 14:07:12 -0700 Subject: [PATCH 8/9] fix: Collection deletion - Adding Cumulus Execution Deletions (#597) * feat: collection deletion * fix: update collection to pass existing code validation * fix: expecting json dict * fix: deleting executions and wait for 10 seconds * fix: normal name to delete executions * fix: need name___version * fix: reduce batch size * fix: brute force delete retry * fix: delete rule first then collection * fix: get executions for collection * fix: delete only executions if existed --- .../cumulus_wrapper/query_collections.py | 70 +++++++++++ .../uds_api/dapa/collections_dapa_creation.py | 117 ++++++++---------- 2 files changed, 124 insertions(+), 63 deletions(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 43823690..0c43b3c6 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list): collection_names = [k.split('___')[0] for k in collection_ids] self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}') return self + def get_size(self, private_api_prefix: str): query_params = {'field': 'status', 'type': 'collections'} main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]} @@ -191,6 +192,75 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'status': query_result['message']} + def delete_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + request_body = { + "collectionId": f'{new_collection["name"]}___{new_collection["version"]}', + "esBatchSize": 10000, + "dbBatchSize": 50000 + } + payload = { + 'httpMethod': 'POST', + 'resource': '/{proxy+}', + 'path': f'/executions/bulk-delete-by-collection', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': json.dumps(request_body) + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'id' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result} + + def list_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + payload = { + 'httpMethod': 'GET', + 'resource': '/{proxy+}', + 'path': f'/executions', + 'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'}, + 'headers': { + 'Content-Type': 'application/json', + } + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'results' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'results': query_result['results']} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 42a8611a..6b111a9d 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -1,5 +1,6 @@ import json import os +from time import sleep from typing import Optional import pystac @@ -81,68 +82,17 @@ def __init__(self, request_body): self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True) self.__cumulus_collection_query = CollectionsQuery('', '') - def __delete_collection_cumulus(self, cumulus_collection_doc): - delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) - if 'status' not in delete_result: - LOGGER.error(f'status not in creation_result: {delete_result}') + def analyze_cumulus_result(self, cumulus_request_result): + if 'status' not in cumulus_request_result: + LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}') return { 'statusCode': 500, 'body': { - 'message': delete_result + 'message': cumulus_request_result } }, None - return None, delete_result + return None, cumulus_request_result - def __create_collection_cumulus(self, cumulus_collection_doc): - creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) - if 'status' not in creation_result: - LOGGER.error(f'status not in creation_result: {creation_result}') - return { - 'statusCode': 500, - 'body': { - 'message': creation_result - } - }, None - return None, creation_result - - def __create_rules_cumulus(self, cumulus_collection_doc): - rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix, - self.__ingest_sqs_url, - self.__provider_id, - self.__workflow_name, - ) - if 'status' not in rule_creation_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}') - delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, - cumulus_collection_doc['name'], - cumulus_collection_doc['version']) - self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id()) - return { - 'statusCode': 500, - 'body': { - 'message': rule_creation_result, - 'details': f'collection deletion result: {delete_collection_result}' - } - } - return None - - def __delete_rules_cumulus(self, cumulus_collection_doc): - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix - ) - if 'status' not in rule_deletion_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') - return { - 'statusCode': 500, - 'body': { - 'message': rule_deletion_result, - 'details': f'collection deletion result: {rule_deletion_result}' - } - } - return None def __delete_collection_uds(self): try: @@ -189,17 +139,36 @@ def __create_collection_uds(self, cumulus_collection_doc): def delete(self): deletion_result = {} try: + cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: - rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc) - deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded' - delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc) + result = self.__cumulus_collection_query.list_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + LOGGER.debug(f'execution list result: {result}') + if len(result['results']) > 0: + self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + return { + 'statusCode': 409, + 'body': { + 'message': f'There are cumulus executions for this collection. Deleting them. Pls try again in a few minutes.', + } + } + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) + if delete_err is not None: + LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}') + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result else: + deletion_result['cumulus_executions_deletion'] = 'NA' deletion_result['cumulus_rule_deletion'] = 'NA' deletion_result['cumulus_collection_deletion'] = 'NA' @@ -222,6 +191,20 @@ def delete(self): } } + def __delete_collection_rule(self, cumulus_collection_doc, deletion_result): + if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']: + return + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) + rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) + deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result + return + + def __delete_collection_execution(self, cumulus_collection_doc, deletion_result): + executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) + deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result + sleep(10) + return def create(self): try: cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) @@ -229,16 +212,24 @@ def create(self): LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: - creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc) + creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) + creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result) if creation_err is not None: return creation_err uds_creation_result = self.__create_collection_uds(cumulus_collection_doc) if uds_creation_result is not None: return uds_creation_result if self.__include_cumulus: - create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc) - if create_rule_result is not None: - return create_rule_result + rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( + cumulus_collection_doc, + self.__cumulus_lambda_prefix, + self.__ingest_sqs_url, + self.__provider_id, + self.__workflow_name, + ) + create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result) + if create_rule_err is not None: + return create_rule_err # validation_result = pystac.Collection.from_dict(self.__request_body).validate() # cumulus_collection_query = CollectionsQuery('', '') # From 850ca4691355edaa59922b95b30c7976bcdb2362 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 25 Jun 2025 14:08:21 -0700 Subject: [PATCH 9/9] chore: update version + change log (#599) Co-authored-by: ngachung --- CHANGELOG.md | 4 ++++ setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d502b5db..feb2d8c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.15.1] - 2025-06-25 +### Fixed +- [#597](https://github.com/unity-sds/unity-data-services/pull/597) fix: collection deletion - adding cumulus execution deletions + ## [9.15.0] - 2025-06-03 ### Changed - [#591](https://github.com/unity-sds/unity-data-services/pull/591) feat: empty collection delete diff --git a/setup.py b/setup.py index 9c20ee35..e0d72ee8 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name="cumulus_lambda_functions", - version="9.15.0", + version="9.15.1", packages=find_packages(), install_requires=install_requires, package_data={