From e1cc166c6abcd89695493825bd086237f8429fa3 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Mon, 1 Apr 2024 14:30:13 -0700 Subject: [PATCH 1/6] Implement event based triggering --- .gitignore | 4 + lambda/deployment_packages/.gitkeep | 0 .../airflow_dag_trigger.py | 48 +++ pyproject.toml | 11 + .../terraform-unity-sps-airflow/README.md | 19 ++ .../terraform-unity-sps-airflow/main.tf | 285 ++++++++++++++++++ 6 files changed, 367 insertions(+) create mode 100644 lambda/deployment_packages/.gitkeep create mode 100644 lambda/src/airflow-dag-trigger/airflow_dag_trigger.py diff --git a/.gitignore b/.gitignore index 571aeb03..670de40d 100644 --- a/.gitignore +++ b/.gitignore @@ -243,3 +243,7 @@ override.tf.json # Ignore CLI configuration files .terraformrc terraform.rc + + +/lambda/deployment_packages/* +!/lambda/deployment_packages/.gitkeep diff --git a/lambda/deployment_packages/.gitkeep b/lambda/deployment_packages/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py new file mode 100644 index 00000000..122c8df1 --- /dev/null +++ b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py @@ -0,0 +1,48 @@ +import logging +import os +import uuid +from datetime import datetime +from typing import List +from urllib.parse import unquote_plus + +import requests +from aws_lambda_powertools.utilities.parser import envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import S3Model +from aws_lambda_powertools.utilities.typing import LambdaContext + +# Configure your logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +AIRFLOW_BASE_API_ENDPOINT = os.environ.get("AIRFLOW_BASE_API_ENDPOINT") +AIRFLOW_USERNAME = os.environ.get("AIRFLOW_USERNAME") +AIRFLOW_PASSWORD = os.environ.get("AIRFLOW_PASSWORD") + + +def trigger_airflow_dag(dag_id: str): + url = f"{AIRFLOW_BASE_API_ENDPOINT}/dags/{dag_id}/dagRuns" + dag_run_id = str(uuid.uuid4()) + logical_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + headers = {"Content-Type": "application/json", "Accept": "application/json"} + auth = (AIRFLOW_USERNAME, AIRFLOW_PASSWORD) + payload = {"dag_run_id": dag_run_id, "logical_date": logical_date, "conf": {}, "note": ""} + response = requests.post(url, auth=auth, headers=headers, json=payload) + if response.status_code == 200 or response.status_code == 201: + logger.info(f"Successfully triggered Airflow DAG {dag_id}: {response.json()}") + else: + logger.error(f"Failed to trigger Airflow DAG {dag_id}: {response.text}") + + +@event_parser(model=S3Model, envelope=envelopes.SnsSqsEnvelope) +def lambda_handler(event: List[S3Model], context: LambdaContext) -> dict: + try: + object_key = unquote_plus(event[0].Records[0].s3.object.key) + bucket_name = unquote_plus(event[0].Records[0].s3.bucket.name) + logger.info(f"Source bucket: {bucket_name}, Source key: {object_key}") + dag_id = "cwl-dag" + trigger_airflow_dag(dag_id) + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return {"statusCode": 500, "body": "An unexpected error occurred: " + str(e)} + + return {"statusCode": 200, "body": "Success!"} diff --git a/pyproject.toml b/pyproject.toml index 096a96a8..3fcf3849 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,17 @@ test = [ "apache-airflow-providers-cncf-kubernetes==8.0.0" ] experiment = [] +lambda-airflow-dag-trigger = [ + "boto3<1.28.27", + "requests<=2.30.0", + "urllib3<1.27", + "jinja2==3.1.2", + "jsonschema==4.17.3", + "referencing==0.23.0", + "aws-lambda-powertools==2.22.0", + "pydantic>=1.8.2,<2.0.0", + "fastjsonschema>=2.14.5,<3.0.0", +] [tool.setuptools.packages.find] exclude = ["tests*"] diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index b2e3e1ea..9f522a4b 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -30,15 +30,26 @@ No modules. | Name | Type | |------|------| +| [aws_cloudwatch_log_group.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/cloudwatch_log_group) | resource | | [aws_db_instance.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_instance) | resource | | [aws_db_subnet_group.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_subnet_group) | resource | | [aws_efs_access_point.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_access_point) | resource | | [aws_efs_file_system.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_file_system) | resource | | [aws_efs_mount_target.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_mount_target) | resource | | [aws_iam_policy.airflow_worker_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource | +| [aws_iam_policy.lambda_sqs_access](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource | | [aws_iam_role.airflow_worker_role](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource | +| [aws_iam_role.lambda](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource | | [aws_iam_role_policy_attachment.airflow_worker_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_iam_role_policy_attachment.lambda_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_iam_role_policy_attachment.lambda_sqs_access_attach](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_lambda_event_source_mapping.lambda_airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_event_source_mapping) | resource | +| [aws_lambda_function.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_function) | resource | | [aws_s3_bucket.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket.inbound_staging_location](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket_notification.isl_bucket_notification](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket_notification) | resource | +| [aws_s3_object.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_object) | resource | | [aws_secretsmanager_secret.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret) | resource | | [aws_secretsmanager_secret_version.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret_version) | resource | | [aws_security_group.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource | @@ -46,9 +57,16 @@ No modules. | [aws_security_group_rule.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.eks_egress_to_rds](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.rds_ingress_from_eks](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | +| [aws_sns_topic.s3_isl_event_topic](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic) | resource | +| [aws_sns_topic_policy.s3_isl_event_topic_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_policy) | resource | +| [aws_sns_topic_subscription.s3_isl_event_subscription](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_subscription) | resource | +| [aws_sqs_queue.s3_isl_event_queue](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue) | resource | +| [aws_sqs_queue_policy.s3_isl_event_queue_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue_policy) | resource | | [aws_ssm_parameter.airflow_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | +| [aws_ssm_parameter.airflow_dag_trigger_lambda_package](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.airflow_ui_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | +| [aws_ssm_parameter.isl_bucket](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.ogc_processes_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [helm_release.airflow](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource | | [helm_release.keda](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource | @@ -65,6 +83,7 @@ No modules. | [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource | | [kubernetes_storage_class.efs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource | +| [null_resource.build_lambda_packages](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | | [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index e55d2d43..d724e24f 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -588,3 +588,288 @@ resource "aws_ssm_parameter" "ogc_processes_api_url" { Stack = "SSM" }) } + +resource "null_resource" "build_lambda_packages" { + triggers = { + lambda_dir_sha1 = sha1( + join("", [ + for f in fileset("${path.module}/../../../lambda/src", "**/**") : filesha1("${path.module}/../../../lambda/src/${f}")] + ) + ) + } + + provisioner "local-exec" { + command = < Date: Mon, 1 Apr 2024 15:19:13 -0700 Subject: [PATCH 2/6] Use new dag --- lambda/src/airflow-dag-trigger/airflow_dag_trigger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py index 122c8df1..ac68a7d4 100644 --- a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py +++ b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py @@ -39,7 +39,7 @@ def lambda_handler(event: List[S3Model], context: LambdaContext) -> dict: object_key = unquote_plus(event[0].Records[0].s3.object.key) bucket_name = unquote_plus(event[0].Records[0].s3.bucket.name) logger.info(f"Source bucket: {bucket_name}, Source key: {object_key}") - dag_id = "cwl-dag" + dag_id = "sbg-l1-to-l2-e2e-cwl-step-by-step-dag" trigger_airflow_dag(dag_id) except Exception as e: logger.error(f"An unexpected error occurred: {e}") From 0e0b91c3c99b01ed877e4b9d8088858ae2b4a631 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Mon, 1 Apr 2024 15:45:52 -0700 Subject: [PATCH 3/6] Use new dag --- lambda/src/airflow-dag-trigger/airflow_dag_trigger.py | 1 - pyproject.toml | 11 ++--------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py index ac68a7d4..53e6e4e6 100644 --- a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py +++ b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py @@ -10,7 +10,6 @@ from aws_lambda_powertools.utilities.parser.models import S3Model from aws_lambda_powertools.utilities.typing import LambdaContext -# Configure your logger logger = logging.getLogger() logger.setLevel(logging.INFO) diff --git a/pyproject.toml b/pyproject.toml index 3fcf3849..e64a9987 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,15 +44,8 @@ test = [ ] experiment = [] lambda-airflow-dag-trigger = [ - "boto3<1.28.27", - "requests<=2.30.0", - "urllib3<1.27", - "jinja2==3.1.2", - "jsonschema==4.17.3", - "referencing==0.23.0", - "aws-lambda-powertools==2.22.0", - "pydantic>=1.8.2,<2.0.0", - "fastjsonschema>=2.14.5,<3.0.0", + "requests==2.31.0", + "aws-lambda-powertools==2.36.0", ] [tool.setuptools.packages.find] From 595edc5ee10b0a9777ef6ae476fc0b355c66dc45 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Mon, 1 Apr 2024 16:02:24 -0700 Subject: [PATCH 4/6] Update lambda dependencies --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e64a9987..a0930011 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ test = [ experiment = [] lambda-airflow-dag-trigger = [ "requests==2.31.0", - "aws-lambda-powertools==2.36.0", + "aws-lambda-powertools[parser]==2.36.0", ] [tool.setuptools.packages.find] From 327d0f6dd1389acf7faffb3639745ed2281871ac Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 2 Apr 2024 10:09:53 -0700 Subject: [PATCH 5/6] Add tags --- .../modules/terraform-unity-sps-airflow/main.tf | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index d724e24f..fafa93b2 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -680,6 +680,11 @@ resource "aws_ssm_parameter" "isl_bucket" { resource "aws_sns_topic" "s3_isl_event_topic" { name = format(local.resource_name_prefix, "S3IslSnsTopic") + tags = merge(local.common_tags, { + Name = format(local.resource_name_prefix, "SNS-S3IslSnsTopic") + Component = "SNS" + Stack = "SNS" + }) } resource "aws_sns_topic_policy" "s3_isl_event_topic_policy" { @@ -719,6 +724,11 @@ resource "aws_s3_bucket_notification" "isl_bucket_notification" { resource "aws_sqs_queue" "s3_isl_event_queue" { name = format(local.resource_name_prefix, "S3IslSqsQueue") visibility_timeout_seconds = 60 + tags = merge(local.common_tags, { + Name = format(local.resource_name_prefix, "SQS-S3IslSqsQueue") + Component = "SQS" + Stack = "SQS" + }) } resource "aws_sqs_queue_policy" "s3_isl_event_queue_policy" { From 938d475f7b606ccec5c67426a8a7951441368213 Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Tue, 2 Apr 2024 10:11:36 -0700 Subject: [PATCH 6/6] Add tags --- terraform-unity/modules/terraform-unity-sps-airflow/main.tf | 2 -- 1 file changed, 2 deletions(-) diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index fafa93b2..4ca8f85f 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -806,7 +806,6 @@ resource "aws_iam_role" "lambda" { }, ] }) - permissions_boundary = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:policy/mcp-tenantOperator-AMI-APIG" } @@ -820,7 +819,6 @@ resource "aws_iam_role_policy_attachment" "lambda_logs" { resource "aws_iam_policy" "lambda_sqs_access" { name = format(local.resource_name_prefix, "LambdaSQSAccessPolicy") description = "Allows Lambda function to interact with SQS queue" - policy = jsonencode({ Version = "2012-10-17" Statement = [