diff --git a/cumulus_lambda_functions/granules_cnm_ingester/__init__.py b/cumulus_lambda_functions/granules_cnm_ingester/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cumulus_lambda_functions/granules_cnm_ingester/lambda_function.py b/cumulus_lambda_functions/granules_cnm_ingester/lambda_function.py new file mode 100644 index 00000000..626431a1 --- /dev/null +++ b/cumulus_lambda_functions/granules_cnm_ingester/lambda_function.py @@ -0,0 +1,13 @@ +import json +from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator + + +def lambda_handler(event, context): + """ + :param event: + :param context: + :return: + """ + LambdaLoggerGenerator.remove_default_handlers() + print(f'event: {event}') + raise NotImplementedError('Require implementation later') diff --git a/tf-module/unity-cumulus/granules_cnm_ingester.tf b/tf-module/unity-cumulus/granules_cnm_ingester.tf new file mode 100644 index 00000000..1d45a863 --- /dev/null +++ b/tf-module/unity-cumulus/granules_cnm_ingester.tf @@ -0,0 +1,96 @@ +resource "aws_lambda_function" "granules_cnm_ingester" { + filename = local.lambda_file_name + source_code_hash = filebase64sha256(local.lambda_file_name) + function_name = "${var.prefix}-granules_cnm_ingester" + role = var.lambda_processing_role_arn + handler = "cumulus_lambda_functions.granules_cnm_ingester.lambda_function.lambda_handler" + runtime = "python3.9" + timeout = 300 + reserved_concurrent_executions = var.granules_cnm_ingester__lambda_concurrency # TODO + environment { + variables = { + LOG_LEVEL = var.log_level + SNS_TOPIC_ARN = var.cnm_sns_topic_arn + } + } + + vpc_config { + subnet_ids = var.cumulus_lambda_subnet_ids + security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id] + } + tags = var.tags +} + +resource "aws_sns_topic" "granules_cnm_ingester" { + name = "${var.prefix}-granules_cnm_ingester" + tags = var.tags +} + +resource "aws_sns_topic_policy" "granules_cnm_ingester_policy" { + arn = aws_sns_topic.granules_cnm_ingester.arn + policy = templatefile("${path.module}/sns_policy.json", { + region: var.aws_region, + accountId: local.account_id, + snsName: "${var.prefix}-granules_cnm_ingester", + s3Glob: var.granules_cnm_ingester__s3_glob + }) +} + +resource "aws_sqs_queue" "dead_letter_granules_cnm_ingester" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue + // TODO how to notify admin for failed ingestion? + tags = var.tags + name = "${var.prefix}-dead_letter_granules_cnm_ingester" + delay_seconds = 0 + max_message_size = 262144 + message_retention_seconds = 345600 + visibility_timeout_seconds = 300 + receive_wait_time_seconds = 0 + policy = templatefile("${path.module}/sqs_policy.json", { + region: var.aws_region, + roleArn: var.lambda_processing_role_arn, + accountId: local.account_id, + sqsName: "${var.prefix}-dead_letter_granules_cnm_ingester", + }) +// redrive_policy = jsonencode({ +// deadLetterTargetArn = aws_sqs_queue.terraform_queue_deadletter.arn +// maxReceiveCount = 4 +// }) +// tags = { +// Environment = "production" +// } +} + +resource "aws_sqs_queue" "granules_cnm_ingester" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue + name = "${var.prefix}-granules_cnm_ingester" + delay_seconds = 0 + max_message_size = 262144 + message_retention_seconds = 345600 + visibility_timeout_seconds = var.granules_cnm_ingester__sqs_visibility_timeout_seconds // Used as cool off time in seconds. It will wait for 5 min if it fails + receive_wait_time_seconds = 0 + policy = templatefile("${path.module}/sqs_policy.json", { + region: var.aws_region, + roleArn: var.lambda_processing_role_arn, + accountId: local.account_id, + sqsName: "${var.prefix}-granules_cnm_ingester", + }) + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.dead_letter_granules_cnm_ingester.arn + maxReceiveCount = var.granules_cnm_ingester__sqs_retried_count // How many times it will be retried. + }) + tags = var.tags +} + +resource "aws_sns_topic_subscription" "granules_cnm_ingester_topic_subscription" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic_subscription + topic_arn = aws_sns_topic.granules_cnm_ingester.arn + protocol = "sqs" + endpoint = aws_sqs_queue.granules_cnm_ingester.arn +# filter_policy_scope = "MessageBody" // MessageAttributes. not using attributes +# filter_policy = templatefile("${path.module}/ideas_api_job_results_filter_policy.json", {}) +} + +resource "aws_lambda_event_source_mapping" "granules_cnm_ingester_queue_lambda_trigger" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_event_source_mapping#sqs + event_source_arn = aws_sqs_queue.granules_cnm_ingester.arn + function_name = aws_lambda_function.granules_cnm_ingester.arn + batch_size = 1 + enabled = true +} diff --git a/tf-module/unity-cumulus/main.tf b/tf-module/unity-cumulus/main.tf index 1f50b125..2a29591e 100644 --- a/tf-module/unity-cumulus/main.tf +++ b/tf-module/unity-cumulus/main.tf @@ -170,6 +170,7 @@ resource "aws_ssm_parameter" "uds_api_1" { name = "/unity/unity-ds/api-gateway/integrations/${var.prefix}-uds_api_1-function-name" type = "String" value = aws_lambda_function.uds_api_1.function_name + tags = var.tags } @@ -178,4 +179,5 @@ resource "aws_ssm_parameter" "uds_api_1" { name = "${var.health_check_base_path}/${var.health_check_marketplace_item}/${var.health_check_component_name}/url" type = "String" value = "${var.uds_base_url}/${var.dapa_api_prefix}/collections" + tags = var.tags } \ No newline at end of file diff --git a/tf-module/unity-cumulus/sns_policy.json b/tf-module/unity-cumulus/sns_policy.json new file mode 100644 index 00000000..638b7ebe --- /dev/null +++ b/tf-module/unity-cumulus/sns_policy.json @@ -0,0 +1,31 @@ +{ + "Version": "2008-10-17", + "Id": "__default_policy_ID", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": ["s3.amazonaws.com"] + }, + "Action": [ + "SNS:GetTopicAttributes", + "SNS:SetTopicAttributes", + "SNS:AddPermission", + "SNS:RemovePermission", + "SNS:DeleteTopic", + "SNS:Subscribe", + "SNS:ListSubscriptionsByTopic", + "SNS:Publish" + ], + "Resource": "arn:aws:sns:${region}:${accountId}:${snsName}", + "Condition": { + "ArnLike": { + "aws:SourceArn": "arn:aws:s3:*:*:${s3Glob}" + }, + "StringEquals": { + "AWS:SourceAccount": "${accountId}" + } + } + } + ] +} \ No newline at end of file diff --git a/tf-module/unity-cumulus/sqs-sns.tf b/tf-module/unity-cumulus/sqs-sns.tf index e1adfa43..b516fdd2 100644 --- a/tf-module/unity-cumulus/sqs-sns.tf +++ b/tf-module/unity-cumulus/sqs-sns.tf @@ -22,6 +22,7 @@ resource "aws_sqs_queue" "granules_to_es_queue" { // https://registry.terraform // tags = { // Environment = "production" // } + tags = var.tags } resource "aws_sns_topic_subscription" "report_granules_topic_subscription" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic_subscription diff --git a/tf-module/unity-cumulus/sqs_policy.json b/tf-module/unity-cumulus/sqs_policy.json index f6bbacfb..9dbf3f9d 100644 --- a/tf-module/unity-cumulus/sqs_policy.json +++ b/tf-module/unity-cumulus/sqs_policy.json @@ -12,7 +12,7 @@ "sqs:SendMessage*" ], "Principal": { - "Service": "sns.amazonaws.com" + "Service": ["sns.amazonaws.com", "sqs.amazonaws.com"] }, "Resource": "arn:aws:sqs:${region}:${accountId}:${sqsName}" }, diff --git a/tf-module/unity-cumulus/variables.tf b/tf-module/unity-cumulus/variables.tf index cd1705ea..855db151 100644 --- a/tf-module/unity-cumulus/variables.tf +++ b/tf-module/unity-cumulus/variables.tf @@ -190,4 +190,41 @@ variable "health_check_base_path" { type = string default = "/unity/healthCheck" description = "base path for healthcheck which should start with, but not end with `/`" -} \ No newline at end of file +} + +// << Variables for granules_cnm_ingester >> +variable "granules_cnm_ingester__sqs_visibility_timeout_seconds" { + type = number + default = 300 + description = "when a lambda ends in error, how much sqs should wait till it is retried again. (in seconds). defaulted to 5 min" +} + +variable "granules_cnm_ingester__sqs_retried_count" { + type = number + default = 3 + description = "How many times it is retried before pushing it to DLQ. defaulted to 3 times" +} + +variable "granules_cnm_ingester__lambda_concurrency" { + type = number + default = 20 + description = "How many Lambdas can be executed for CNM ingester concurrently" +} + +variable "granules_cnm_ingester__bucket_notification_prefix" { + type = string + default = "stage_out" + description = "path to the directory where catalogs.json will be written" +} + +variable "granules_cnm_ingester__s3_glob" { + type = string + default = "*unity*" + description = "GLOB expression that has all s3 buckets connecting to SNS topic" +} +#variable "granules_cnm_ingester__is_deploying_bucket" { +# type = bool +# default = false +# description = "flag to specify if deploying example bucket" +#} +// << Variables for granules_cnm_ingester END >>