Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Clear Cumulus execution DB records #330

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
49 changes: 49 additions & 0 deletions cumulus_lambda_functions/cleanup_executions/cumulus_db_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
from copy import deepcopy

from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping
from cumulus_lambda_functions.lib.time_utils import TimeUtils

from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

from cumulus_lambda_functions.lib.aws.es_abstract import ESAbstract

from cumulus_lambda_functions.lib.aws.es_factory import ESFactory

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class CumulusDbIndex:
def __init__(self):
required_env = ['ES_URL']
if not all([k in os.environ for k in required_env]):
raise EnvironmentError(f'one or more missing env: {required_env}')
self.__es: ESAbstract = ESFactory().get_instance('AWS',
index=DBConstants.cumulus_alias, # TODO should this come from setting?
base_url=os.getenv('ES_URL'),
port=int(os.getenv('ES_PORT', '443'))
)

def delete_executions(self, cutoff_datetime):
delete_result = self.__es.delete_by_query({
"query": {
"bool": {
"must": [
{"term": {
"_type": {
"value": "execution"
}
}},
{
"range": {
"updatedAt": {
"lte": cutoff_datetime
}
}
}
]
}
}
})
return delete_result
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

from cumulus_lambda_functions.lib.time_utils import TimeUtils

from cumulus_lambda_functions.cleanup_executions.cumulus_db_index import CumulusDbIndex
LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class ExecutionDocsCleaner:
def __init__(self, event):
self.__event = event

def start(self):
cumulus_db = CumulusDbIndex()
relative_time = int(os.environ.get('CUT_OFF_DAYS', 14))
cut_off_time = TimeUtils().get_current_unix_milli() - TimeUtils.DAY_IN_MILLISECOND * relative_time
result = cumulus_db.delete_executions(cut_off_time)
LOGGER.debug(f'deletion result for {cut_off_time}: {result}')
return
15 changes: 15 additions & 0 deletions cumulus_lambda_functions/cleanup_executions/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from cumulus_lambda_functions.cleanup_executions.execution_docs_cleaner import ExecutionDocsCleaner
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
{'cma': {'task_config': {'bucket': '{$.meta.buckets.internal.name}', 'collection': '{$.meta.collection}', 'cumulus_message': {'outputs': [{'source': '{$.files}', 'destination': '{$.payload}'}]}}, 'event': {'cumulus_meta': {'cumulus_version': '10.0.1', 'execution_name': 'c6d885dc-b4b2-4eb0-b22e-b6f58a7a0870', 'message_source': 'sfn', 'queueExecutionLimits': {'https://sqs.us-west-2.amazonaws.com/884500545225/am-uds-dev-cumulus-backgroundProcessing': 5}, 'state_machine': 'arn:aws:states:us-west-2:884500545225:stateMachine:am-uds-dev-cumulus-IngestGranule', 'system_bucket': 'am-uds-dev-cumulus-internal', 'workflow_start_time': 1646785175509, 'parentExecutionArn': 'arn:aws:states:us-west-2:884500545225:execution:am-uds-dev-cumulus-DiscoverGranules:885483b4-ba55-4db1-b197-661e1e595a45', 'queueUrl': 'arn:aws:sqs:us-west-2:884500545225:am-uds-dev-cumulus-startSF'}, 'exception': 'None', 'meta': {'buckets': {'internal': {'name': 'am-uds-dev-cumulus-internal', 'type': 'internal'}, 'protected': {'name': 'am-uds-dev-cumulus-protected', 'type': 'protected'}}, 'cmr': {'clientId': 'CHANGEME', 'cmrEnvironment': 'UAT', 'cmrLimit': 100, 'cmrPageSize': 50, 'oauthProvider': 'earthdata', 'passwordSecretName': 'am-uds-dev-cumulus-message-template-cmr-password20220216072916956000000002', 'provider': 'CHANGEME', 'username': 'username'}, 'collection': {'name': 'ATMS_SCIENCE_Group_2011', 'version': '001', 'process': 'modis', 'granuleId': '^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}0$', 'granuleIdExtraction': '(P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}0).+', 'sampleFileName': 'P1570515ATMSSCIENCEAXT11344000000001.PDS', 'duplicateHandling': 'replace', 'url_path': '{cmrMetadata.Granule.Collection.ShortName}___{cmrMetadata.Granule.Collection.VersionId}', 'provider_path': '/data/SNPP_ATMS_Level0_T/ATMS_SCIENCE_Group/2011/', 'files': [{'bucket': 'internal', 'regex': '^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}00\\.PDS$', 'sampleFileName': 'P1570515ATMSSCIENCEAXT11344000000000.PDS', 'type': 'data'}, {'bucket': 'internal', 'regex': '^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}01\\.PDS$', 'sampleFileName': 'P1570515ATMSSCIENCEAXT11344000000001.PDS', 'type': 'metadata'}, {'bucket': 'internal', 'regex': '^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}01\\.PDS\\.xml$', 'sampleFileName': 'P1570515ATMSSCIENCEAXT11344000000001.PDS.xml', 'type': 'metadata'}, {'bucket': 'internal', 'regex': '^P[0-9]{3}[0-9]{4}[A-Z]{13}T[0-9]{12}0\\.cmr\\.xml$', 'sampleFileName': 'P1570515ATMSSCIENCEAXT11344000000001.PDS.xml', 'type': 'metadata'}], 'updatedAt': 1646326197526, 'createdAt': 1646258167624}, 'distribution_endpoint': 's3://am-uds-dev-cumulus-internal/', 'launchpad': {'api': 'launchpadApi', 'certificate': 'launchpad.pfx', 'passphraseSecretName': ''}, 'provider': {'password': 'AQICAHhSagsGDAl5tQWM010IEvxKgj2LcsNub5v5FHoRpOjXcQHFbE4iMnF/W0Y/NrsYvrfHAAAAajBoBgkqhkiG9w0BBwagWzBZAgEAMFQGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMLaH13SdxPXREjXLtAgEQgCfA+lEu2c/xLTGwJsbtKlXJbKDy4pwV+rS3BnJqgBoLLMQZqOdoFhk=', 'host': 'snppl0.gesdisc.eosdis.nasa.gov', 'updatedAt': 1646244053419, 'protocol': 'https', 'createdAt': 1646244053419, 'encrypted': True, 'username': 'AQICAHhSagsGDAl5tQWM010IEvxKgj2LcsNub5v5FHoRpOjXcQGRoY5EBMpvvyMASUowBM61AAAAYzBhBgkqhkiG9w0BBwagVDBSAgEAME0GCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM9OhRHwTuxiz74q4UAgEQgCDEHOhsVG6+LqXfnlw+Z3Wg9MDOCd9/K5/X5j3tPJYkaA==', 'allowedRedirects': ['https://urs.earthdata.nasa.gov', 'urs.earthdata.nasa.gov'], 'id': 'snpp_provider_02', 'globalConnectionLimit': 10}, 'stack': 'am-uds-dev-cumulus', 'template': 's3://am-uds-dev-cumulus-internal/am-uds-dev-cumulus/workflow_template.json', 'workflow_name': 'IngestGranule', 'workflow_tasks': {'SyncGranule': {'name': 'am-uds-dev-cumulus-SyncGranule', 'version': '$LATEST', 'arn': 'arn:aws:lambda:us-west-2:884500545225:function:am-uds-dev-cumulus-SyncGranule'}}, 'staticValue': 'aStaticValue', 'interpolatedValueStackName': 'am-uds-dev-cumulus', 'input_granules': [{'granuleId': 'P1570515ATMSSCIENCEAXT1134912000000', 'dataType': 'ATMS_SCIENCE_Group_2011', 'version': '001', 'files': [{'bucket': 'am-uds-dev-cumulus-internal', 'key': 'file-staging/am-uds-dev-cumulus/ATMS_SCIENCE_Group_2011___001/P1570515ATMSSCIENCEAXT11349120000000.PDS', 'source': 'data/SNPP_ATMS_Level0_T/ATMS_SCIENCE_Group/2011/349//P1570515ATMSSCIENCEAXT11349120000000.PDS', 'fileName': 'P1570515ATMSSCIENCEAXT11349120000000.PDS', 'type': 'data', 'size': 744}, {'bucket': 'am-uds-dev-cumulus-internal', 'key': 'file-staging/am-uds-dev-cumulus/ATMS_SCIENCE_Group_2011___001/P1570515ATMSSCIENCEAXT11349120000001.PDS', 'source': 'data/SNPP_ATMS_Level0_T/ATMS_SCIENCE_Group/2011/349//P1570515ATMSSCIENCEAXT11349120000001.PDS', 'fileName': 'P1570515ATMSSCIENCEAXT11349120000001.PDS', 'type': 'metadata', 'size': 18084600}, {'bucket': 'am-uds-dev-cumulus-internal', 'key': 'file-staging/am-uds-dev-cumulus/ATMS_SCIENCE_Group_2011___001/P1570515ATMSSCIENCEAXT11349120000001.PDS.xml', 'source': 'data/SNPP_ATMS_Level0_T/ATMS_SCIENCE_Group/2011/349//P1570515ATMSSCIENCEAXT11349120000001.PDS.xml', 'fileName': 'P1570515ATMSSCIENCEAXT11349120000001.PDS.xml', 'type': 'metadata', 'size': 9526}], 'sync_granule_duration': 9822, 'createdAt': 1647386972717}], 'process': 'modis'}, 'payload': {}, 'replace': {'Bucket': 'am-uds-dev-cumulus-internal', 'Key': 'events/5d8edf37-0a18-4af5-a76f-7c2091cdd1e2', 'TargetPath': '$.payload'}}}}
:param event:
:param context:
:return:
"""
LambdaLoggerGenerator.remove_default_handlers()
# TODO implement
ExecutionDocsCleaner(event).start()
return {}
1 change: 1 addition & 0 deletions cumulus_lambda_functions/lib/time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TimeUtils:
MMDD_FORMAT = '%Y-%m-%dT%H:%M:%S'
GB_1 = 1000000000
YR_IN_SECOND = 31536000
DAY_IN_MILLISECOND = 24 * 60 * 60 * 1000

def __init__(self):
self.__time_obj = datetime.utcnow()
Expand Down
1 change: 1 addition & 0 deletions cumulus_lambda_functions/lib/uds_db/db_constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class DBConstants:
cumulus_alias = 'cumulus-alias'
collections_index = 'unity_collections'
collection_id = 'collection_id'

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
from unittest import TestCase

from cumulus_lambda_functions.lib.time_utils import TimeUtils

from cumulus_lambda_functions.cleanup_executions.cumulus_db_index import CumulusDbIndex


class TestCumulusDbIndex(TestCase):
def test_01(self):
os.environ['ES_URL'] = 'https://vpc-uds-dev-cumulus-es-vpc-hjnnwrivoe36fiak4eomtgqlvq.us-west-2.es.amazonaws.com'
os.environ['ES_PORT'] = '9200'
cumulus_db = CumulusDbIndex()
time1 = int(TimeUtils().get_unix_from_timestamp('2024-001T00:00:00.000') * 1000)
result = cumulus_db.delete_executions(time1)
print(result)
self.assertTrue('deleted' in result, f'wrong response: {result}')
return
2 changes: 2 additions & 0 deletions tests/integration_tests/test_custom_metadata_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def test_04_upload_sample_granule(self):

with tempfile.TemporaryDirectory() as tmp_dir_name:
os.environ['OUTPUT_FILE'] = os.path.join(tmp_dir_name, 'some_output', 'output.json')
os.environ['OUTPUT_DIRECTORY'] = os.path.join(tmp_dir_name, 'output_dir')
FileUtils.mk_dir_p(os.environ.get('OUTPUT_DIRECTORY'))
os.environ['UPLOAD_DIR'] = '' # not needed
os.environ['CATALOG_FILE'] = os.path.join(tmp_dir_name, 'catalog.json')
granules_dir = os.path.join(tmp_dir_name, 'some_granules')
Expand Down
5 changes: 4 additions & 1 deletion tf-module/unity-cumulus/api_gateway.tf
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,8 @@ resource "aws_api_gateway_deployment" "shared_services_api_gateway_deployment" {
create_before_destroy = true
}

depends_on = [ aws_api_gateway_integration.openapi_lambda_integration, aws_api_gateway_integration.collection_lambda_integration ]
depends_on = [
aws_api_gateway_integration.openapi_lambda_integration,
aws_api_gateway_integration.collection_lambda_integration
]
}
20 changes: 20 additions & 0 deletions tf-module/unity-cumulus/event_rule.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_rule
resource "aws_cloudwatch_event_rule" "cumulus_executions_remover_rule" {
name = "${var.prefix}-cumulus_executions_remover_rule"
description = "${var.prefix}-cumulus_executions_remover_rule"
schedule_expression = "cron(0 20 * * ? *)"
}

resource "aws_cloudwatch_event_target" "cumulus_executions_remover_target" {
target_id = "${var.prefix}-cumulus_executions_remover_target"
rule = aws_cloudwatch_event_rule.cumulus_executions_remover_rule.name
arn = aws_lambda_function.cleanup_executions.arn
}

resource "aws_lambda_permission" "cumulus_executions_remover_permission" {
statement_id = "${var.prefix}-cumulus_executions_remover_permission"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.cleanup_executions.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.cumulus_executions_remover_rule.arn
}
23 changes: 23 additions & 0 deletions tf-module/unity-cumulus/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,29 @@ resource "aws_lambda_function" "granules_to_es" {
tags = var.tags
}

resource "aws_lambda_function" "cleanup_executions" {
filename = local.lambda_file_name
function_name = "${var.prefix}-cleanup_executions"
role = var.lambda_processing_role_arn
handler = "cumulus_lambda_functions.cleanup_executions.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
environment {
variables = {
LOG_LEVEL = var.log_level
ES_URL = data.aws_elasticsearch_domain.cumulus_es.endpoint
ES_PORT = 443
CUT_OFF_DAYS = var.cumulus_execution_cut_off_days
}
}

vpc_config {
subnet_ids = var.cumulus_lambda_subnet_ids
security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id]
}
tags = var.tags
}

resource "aws_lambda_function" "uds_api_1" {
filename = local.lambda_file_name
function_name = "${var.prefix}-uds_api_1"
Expand Down
4 changes: 4 additions & 0 deletions tf-module/unity-cumulus/opensearch.tf
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ resource "aws_elasticsearch_domain" "uds-es" {
}
)
tags = var.tags
}

data "aws_elasticsearch_domain" "cumulus_es" {
domain_name = var.cumulus_es_domain
}
11 changes: 11 additions & 0 deletions tf-module/unity-cumulus/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,15 @@ variable "rest_api_stage" {
variable "unity_cognito_authorizer__authorizer_id" {
type = string
description = "Example: 0h9egs"
}

variable "cumulus_es_domain" {
type = string
description = "Domain name of Cumulus ES Domain"
}

variable "cumulus_execution_cut_off_days" {
type = number
default = 14
description = "how many days of history to keep. anything older will be deleted"
}
Loading