diff --git a/.gitignore b/.gitignore index 571aeb03..670de40d 100644 --- a/.gitignore +++ b/.gitignore @@ -243,3 +243,7 @@ override.tf.json # Ignore CLI configuration files .terraformrc terraform.rc + + +/lambda/deployment_packages/* +!/lambda/deployment_packages/.gitkeep diff --git a/lambda/deployment_packages/.gitkeep b/lambda/deployment_packages/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py new file mode 100644 index 00000000..53e6e4e6 --- /dev/null +++ b/lambda/src/airflow-dag-trigger/airflow_dag_trigger.py @@ -0,0 +1,47 @@ +import logging +import os +import uuid +from datetime import datetime +from typing import List +from urllib.parse import unquote_plus + +import requests +from aws_lambda_powertools.utilities.parser import envelopes, event_parser +from aws_lambda_powertools.utilities.parser.models import S3Model +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +AIRFLOW_BASE_API_ENDPOINT = os.environ.get("AIRFLOW_BASE_API_ENDPOINT") +AIRFLOW_USERNAME = os.environ.get("AIRFLOW_USERNAME") +AIRFLOW_PASSWORD = os.environ.get("AIRFLOW_PASSWORD") + + +def trigger_airflow_dag(dag_id: str): + url = f"{AIRFLOW_BASE_API_ENDPOINT}/dags/{dag_id}/dagRuns" + dag_run_id = str(uuid.uuid4()) + logical_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + headers = {"Content-Type": "application/json", "Accept": "application/json"} + auth = (AIRFLOW_USERNAME, AIRFLOW_PASSWORD) + payload = {"dag_run_id": dag_run_id, "logical_date": logical_date, "conf": {}, "note": ""} + response = requests.post(url, auth=auth, headers=headers, json=payload) + if response.status_code == 200 or response.status_code == 201: + logger.info(f"Successfully triggered Airflow DAG {dag_id}: {response.json()}") + else: + logger.error(f"Failed to trigger Airflow DAG {dag_id}: {response.text}") + + +@event_parser(model=S3Model, envelope=envelopes.SnsSqsEnvelope) +def lambda_handler(event: List[S3Model], context: LambdaContext) -> dict: + try: + object_key = unquote_plus(event[0].Records[0].s3.object.key) + bucket_name = unquote_plus(event[0].Records[0].s3.bucket.name) + logger.info(f"Source bucket: {bucket_name}, Source key: {object_key}") + dag_id = "sbg-l1-to-l2-e2e-cwl-step-by-step-dag" + trigger_airflow_dag(dag_id) + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return {"statusCode": 500, "body": "An unexpected error occurred: " + str(e)} + + return {"statusCode": 200, "body": "Success!"} diff --git a/pyproject.toml b/pyproject.toml index 096a96a8..a0930011 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ test = [ "apache-airflow-providers-cncf-kubernetes==8.0.0" ] experiment = [] +lambda-airflow-dag-trigger = [ + "requests==2.31.0", + "aws-lambda-powertools[parser]==2.36.0", +] [tool.setuptools.packages.find] exclude = ["tests*"] diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index b2e3e1ea..9f522a4b 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -30,15 +30,26 @@ No modules. | Name | Type | |------|------| +| [aws_cloudwatch_log_group.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/cloudwatch_log_group) | resource | | [aws_db_instance.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_instance) | resource | | [aws_db_subnet_group.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_subnet_group) | resource | | [aws_efs_access_point.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_access_point) | resource | | [aws_efs_file_system.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_file_system) | resource | | [aws_efs_mount_target.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_mount_target) | resource | | [aws_iam_policy.airflow_worker_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource | +| [aws_iam_policy.lambda_sqs_access](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource | | [aws_iam_role.airflow_worker_role](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource | +| [aws_iam_role.lambda](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource | | [aws_iam_role_policy_attachment.airflow_worker_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_iam_role_policy_attachment.lambda_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_iam_role_policy_attachment.lambda_sqs_access_attach](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource | +| [aws_lambda_event_source_mapping.lambda_airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_event_source_mapping) | resource | +| [aws_lambda_function.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_function) | resource | | [aws_s3_bucket.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket.inbound_staging_location](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource | +| [aws_s3_bucket_notification.isl_bucket_notification](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket_notification) | resource | +| [aws_s3_object.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_object) | resource | | [aws_secretsmanager_secret.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret) | resource | | [aws_secretsmanager_secret_version.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret_version) | resource | | [aws_security_group.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource | @@ -46,9 +57,16 @@ No modules. | [aws_security_group_rule.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.eks_egress_to_rds](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | | [aws_security_group_rule.rds_ingress_from_eks](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource | +| [aws_sns_topic.s3_isl_event_topic](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic) | resource | +| [aws_sns_topic_policy.s3_isl_event_topic_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_policy) | resource | +| [aws_sns_topic_subscription.s3_isl_event_subscription](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_subscription) | resource | +| [aws_sqs_queue.s3_isl_event_queue](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue) | resource | +| [aws_sqs_queue_policy.s3_isl_event_queue_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue_policy) | resource | | [aws_ssm_parameter.airflow_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | +| [aws_ssm_parameter.airflow_dag_trigger_lambda_package](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.airflow_ui_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | +| [aws_ssm_parameter.isl_bucket](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [aws_ssm_parameter.ogc_processes_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource | | [helm_release.airflow](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource | | [helm_release.keda](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource | @@ -65,6 +83,7 @@ No modules. | [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource | | [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource | | [kubernetes_storage_class.efs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource | +| [null_resource.build_lambda_packages](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource | | [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | | [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index e55d2d43..4ca8f85f 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -588,3 +588,296 @@ resource "aws_ssm_parameter" "ogc_processes_api_url" { Stack = "SSM" }) } + +resource "null_resource" "build_lambda_packages" { + triggers = { + lambda_dir_sha1 = sha1( + join("", [ + for f in fileset("${path.module}/../../../lambda/src", "**/**") : filesha1("${path.module}/../../../lambda/src/${f}")] + ) + ) + } + + provisioner "local-exec" { + command = <