Skip to content

Commit

Permalink
Merge 92949db into 8c4537f
Browse files Browse the repository at this point in the history
  • Loading branch information
wphyojpl authored Jan 16, 2025
2 parents 8c4537f + 92949db commit d62ab6c
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 14 deletions.
80 changes: 67 additions & 13 deletions cumulus_lambda_functions/lib/uds_db/granules_db_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,20 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
self.__es.swap_index_for_alias(write_perc_alias_name, current_perc_index_name, new_perc_index_name)
try:
self.__es.migrate_index_data(current_perc_index_name, new_perc_index_name)
except Exception as e:
except:
LOGGER.exception(f'failed to migrate index data: {(current_perc_index_name, new_perc_index_name)}')
return

def get_latest_index(self, tenant, tenant_venue):
def get_latest_index_name(self, tenant, tenant_venue):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
write_alias_name = self.__es.get_alias(write_alias_name)
if len(write_alias_name) != 1:
raise ValueError(f'missing index for {tenant}_{tenant_venue}. {write_alias_name}')
latest_index_name = [k for k in write_alias_name.keys()][0]
return latest_index_name

def get_latest_index(self, tenant, tenant_venue):
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
index_mapping = self.__es.get_index_mapping(latest_index_name)
if index_mapping is None:
raise ValueError(f'missing index: {latest_index_name}')
Expand Down Expand Up @@ -201,37 +205,87 @@ def get_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
raise ValueError(f"no such granule: {doc_id}")
return result

def delete_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
def __query_by_id_local(self, tenant: str, tenant_venue: str, doc_id: str, ):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
result = self.__es.query({
dsl = {
'size': 9999,
'query': {'term': {'_id': doc_id}}
}, read_alias_name)
if result is None:
raise ValueError(f"no such granule: {doc_id}")
for each_granule in result['hits']['hits']:
'sort': [
{'properties.datetime': {'order': 'desc'}},
{'id': {'order': 'asc'}}
],
'query': {
'term': {'_id': doc_id}
}
}
result = self.__es.query(dsl, read_alias_name)
if result is None or len(result['hits']['hits']) < 1:
return []
return result['hits']['hits']

def __delete_old_entries(self, dsl_result):
for each_granule in dsl_result:
LOGGER.debug(f"deleting {each_granule['_id']} from {each_granule['_index']}")
delete_result = self.__es.delete_by_query({
'query': {'term': {'id': each_granule['_id']}}
}, each_granule['_index'])
LOGGER.debug(f'delete_result: {delete_result}')
if delete_result is None:
raise ValueError(f"error deleting {each_granule}")
return

def delete_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
result = self.__query_by_id_local(tenant, tenant_venue, doc_id)
if len(result) < 1:
raise ValueError(f"no such granule: {doc_id}")
self.__delete_old_entries(result)
return result

def update_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str, ):
# find existing doc_id
# if not found, throw error. Cannot update
# if found, check index.
# if latest index, proceed with update
# if older index, proceed with get + delete
# tweak meta locally, and add it.
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
json_body['event_time'] = TimeUtils.get_current_unix_milli()
self.__es.update_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
LOGGER.debug(f'custom_metadata indexed')
existing_entries = self.__query_by_id_local(tenant, tenant_venue, doc_id)
if len(existing_entries) < 1:
raise ValueError(f'unable to update {doc_id} as it is not found. ')
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
existing_entry = existing_entries[0]
if existing_entry['_index'] == latest_index_name:
LOGGER.debug(f'{doc_id} in latest index: {latest_index_name}. continuing with update')
self.__es.update_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
self.__delete_old_entries(existing_entries[1:])
return
LOGGER.debug(f'{doc_id} in older index: {latest_index_name} v. {existing_entry["_index"]}')
new_doc = {**existing_entry['_source'], **json_body}
self.__es.index_one(new_doc, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
self.__delete_old_entries(existing_entries)
return

def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str, ):
# find existing doc_id
# if not found, add it
# if found, and it is in latest index, add it.
# if found, and it is in older index, add current one, and delete the older one.

write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
json_body['event_time'] = TimeUtils.get_current_unix_milli()
# TODO validate custom metadata vs the latest index to filter extra items
existing_entries = self.__query_by_id_local(tenant, tenant_venue, doc_id)
if len(existing_entries) < 1:
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
return
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
existing_entry = existing_entries[0]
if existing_entry['_index'] == latest_index_name:
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
self.__delete_old_entries(existing_entries[1:])
return
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
LOGGER.debug(f'custom_metadata indexed')
self.__delete_old_entries(existing_entries)
# TODO validate custom metadata vs the latest index to filter extra items
return

def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
Expand Down
259 changes: 258 additions & 1 deletion tests/cumulus_lambda_functions/lib/uds_db/test_granules_db_index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import json
import os
from copy import deepcopy
from time import sleep
from unittest import TestCase

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
from mdps_ds_lib.lib.aws.es_abstract import ESAbstract
from mdps_ds_lib.lib.aws.es_factory import ESFactory

from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex


Expand Down Expand Up @@ -50,4 +56,255 @@ def test_02(self):
query_result = granules_index.dsl_search(self.tenant, self.tenant_venue, search_dsl)
print(json.dumps(query_result, indent=4))
# self.assertEqual(custom_metadata, expected)
return
return

def test_complete(self):
"""
Steps:
1. Create index 1
2. Add doc 1
3. Add doc 2
4. Create index 2
5. Add doc 3
6. Update doc 1
7. make sure index 1: doc 1 is disappeared
8. Update doc 3
9. make sure index 2 : doc 3 is updated
10. Create index 3
11. Update doc 4
12. It should throw error.
13. Update doc 2. Make sure index 1 : doc 2 is removed
14. Update doc 3. Make sure index 2 : doc 3 is removed
:return:
"""
os.environ['ES_URL'] = 'https://vpc-uds-sbx-cumulus-es-qk73x5h47jwmela5nbwjte4yzq.us-west-2.es.amazonaws.com'
os.environ['ES_PORT'] = '9200'
granules_db_index = GranulesDbIndex()
es: ESAbstract = ESFactory().get_instance('AWS',
index=DBConstants.collections_index,
base_url=os.getenv('ES_URL'),
port=int(os.getenv('ES_PORT', '443'))
)

self.tenant = 'UDS_LOCAL_UNIT_TEST' # 'uds_local_test' # 'uds_sandbox'
self.tenant_venue = 'UNIT' # 'DEV1' # 'dev'
self.collection_name = 'UDS_UNIT_TEST_1'
self.collection_version = '001'
collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}'
self.custom_metadata_body1 = {
'tag': {'type': 'keyword'},
'c_data1': {'type': 'long'},
}
self.custom_metadata_body2 = {
'tag': {'type': 'keyword'},
'c_data2': {'type': 'long'},
}
self.custom_metadata_body3 = {
'tag': {'type': 'keyword'},
'c_data3': {'type': 'long'},
}

granule_id1 = f'{collection_id}:test_file01'
granule_id2 = f'{collection_id}:test_file02'
granule_id3 = f'{collection_id}:test_file03'
granule_id4 = f'{collection_id}:test_file04'

mock_feature1 = {
"type": "Feature",
"stac_version": "1.0.0",
"id": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09",
"properties": {
"datetime": "2024-11-26T23:37:15.288000Z",
"start_datetime": "2016-01-31T18:00:00.009000Z",
"end_datetime": "2016-01-31T19:59:59.991000Z",
"created": "1970-01-01T00:00:00Z",
"updated": "2024-11-26T23:38:01.692000Z",
"status": "completed",
"provider": "unity",
},
"geometry": {
"type": "Point",
"coordinates": [
0.0,
0.0
]
},
"links": [
{
"rel": "collection",
"href": "."
}
],
"assets": {
"test_file09.nc": {
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc",
"title": "test_file09.nc",
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
"file:size": 0,
"file:checksum": "00000000000000000000000000000000",
"roles": [
"data"
]
},
"test_file09.nc.cas": {
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc.cas",
"title": "test_file09.nc.cas",
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
"file:size": 0,
"file:checksum": "00000000000000000000000000000000",
"roles": [
"metadata"
]
},
"test_file09.nc.stac.json": {
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc.stac.json",
"title": "test_file09.nc.stac.json",
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
"file:size": 0,
"file:checksum": "00000000000000000000000000000000",
"roles": [
"metadata"
]
},
"test_file09.cmr.xml": {
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.cmr.xml",
"title": "test_file09.cmr.xml",
"description": "size=1716;checksumType=md5;checksum=f842ba4e23e76ae81014a01c820b01f7;",
"file:size": 1716,
"file:checksum": "f842ba4e23e76ae81014a01c820b01f7",
"roles": [
"metadata"
]
}
},
"bbox": {
"type": "envelope",
"coordinates": [
[
-180.0,
90.0
],
[
180.0,
-90.0
]
]
},
"stac_extensions": [
"https://stac-extensions.github.io/file/v2.1.0/schema.json"
],
"collection": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001",
"event_time": 1732664287722
}
mock_feature2 = deepcopy(mock_feature1)
mock_feature3 = deepcopy(mock_feature1)
mock_feature4 = deepcopy(mock_feature1)
mock_feature1['id'] = granule_id1
mock_feature2['id'] = granule_id2
mock_feature3['id'] = granule_id3
mock_feature4['id'] = granule_id4

new_index_name1 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v01'.lower().strip()
new_index_name2 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v02'.lower().strip()
new_index_name3 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v03'.lower().strip()

if es.has_index(new_index_name1):
es.delete_index(new_index_name1)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v01'.lower().strip())
if es.has_index(new_index_name2):
es.delete_index(new_index_name2)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v02'.lower().strip())
if es.has_index(new_index_name3):
es.delete_index(new_index_name3)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v03'.lower().strip())


#### index v1 ####
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body1)
sleep(2)
self.assertTrue(es.has_index(new_index_name1), f'missing {new_index_name1}')

granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature1, granule_id1)
sleep(2)
check_result = es.query_by_id(granule_id1, new_index_name1)
self.assertTrue(check_result is not None, f'granule_id1 - new_index_name1 {check_result}')

granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature2, granule_id2)
sleep(2)
check_result = es.query_by_id(granule_id2, new_index_name1)
self.assertTrue(check_result is not None, f'granule_id2 - new_index_name1 {check_result}')

#### index v2 ####
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body2)
sleep(2)
self.assertTrue(es.has_index(new_index_name2), f'missing {new_index_name2}')

granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature3, granule_id3)
sleep(2)
check_result = es.query_by_id(granule_id3, new_index_name2)
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name2 {check_result}')

check_result = es.query_by_id(granule_id3, new_index_name1)
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')

granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id1)
sleep(2)
check_result = es.query_by_id(granule_id1, new_index_name2)
self.assertTrue(check_result is not None, f'granule_id1 - new_index_name2 {check_result}')

check_result = es.query_by_id(granule_id1, new_index_name1)
self.assertTrue(check_result is None, f'granule_id1 - new_index_name1 is not None{check_result}')

granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_successful'}, granule_id3)
sleep(2)
check_result = es.query_by_id(granule_id3, new_index_name2)
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name2 {check_result}')

check_result = es.query_by_id(granule_id3, new_index_name1)
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')

#### index v3 ####
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body3)
sleep(2)
self.assertTrue(es.has_index(new_index_name3), f'missing {new_index_name3}')

with self.assertRaises(ValueError) as context:
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id4)
sleep(2)
self.assertTrue(str(context.exception).startswith('unable to update'))
# TODO check error
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_r_failed'}, granule_id2)
sleep(2)
check_result = es.query_by_id(granule_id2, new_index_name3)
self.assertTrue(check_result is not None, f'granule_id2 - new_index_name3 {check_result}')

check_result = es.query_by_id(granule_id2, new_index_name1)
self.assertTrue(check_result is None, f'granule_id2 - new_index_name1 is not None{check_result}')

check_result = es.query_by_id(granule_id2, new_index_name2)
self.assertTrue(check_result is None, f'granule_id2 - new_index_name2 is not None{check_result}')

granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id3)
sleep(2)
check_result = es.query_by_id(granule_id3, new_index_name3)
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name3 {check_result}')

check_result = es.query_by_id(granule_id3, new_index_name1)
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')

check_result = es.query_by_id(granule_id3, new_index_name2)
self.assertTrue(check_result is None, f'granule_id3 - new_index_name2 is not None{check_result}')

if es.has_index(new_index_name1):
es.delete_index(new_index_name1)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v01'.lower().strip())
if es.has_index(new_index_name2):
es.delete_index(new_index_name2)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v02'.lower().strip())
if es.has_index(new_index_name3):
es.delete_index(new_index_name3)
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v03'.lower().strip())

return

0 comments on commit d62ab6c

Please sign in to comment.