Skip to content

Commit

Permalink
Merge pull request #51 from unity-sds/50-event-based-triggering
Browse files Browse the repository at this point in the history
S3 event-based triggering of SBG DAG
  • Loading branch information
drewm-swe authored Apr 2, 2024
2 parents 10849cb + 938d475 commit 9dedc43
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,7 @@ override.tf.json
# Ignore CLI configuration files
.terraformrc
terraform.rc


/lambda/deployment_packages/*
!/lambda/deployment_packages/.gitkeep
Empty file.
47 changes: 47 additions & 0 deletions lambda/src/airflow-dag-trigger/airflow_dag_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import os
import uuid
from datetime import datetime
from typing import List
from urllib.parse import unquote_plus

import requests
from aws_lambda_powertools.utilities.parser import envelopes, event_parser
from aws_lambda_powertools.utilities.parser.models import S3Model
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

AIRFLOW_BASE_API_ENDPOINT = os.environ.get("AIRFLOW_BASE_API_ENDPOINT")
AIRFLOW_USERNAME = os.environ.get("AIRFLOW_USERNAME")
AIRFLOW_PASSWORD = os.environ.get("AIRFLOW_PASSWORD")


def trigger_airflow_dag(dag_id: str):
url = f"{AIRFLOW_BASE_API_ENDPOINT}/dags/{dag_id}/dagRuns"
dag_run_id = str(uuid.uuid4())
logical_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
headers = {"Content-Type": "application/json", "Accept": "application/json"}
auth = (AIRFLOW_USERNAME, AIRFLOW_PASSWORD)
payload = {"dag_run_id": dag_run_id, "logical_date": logical_date, "conf": {}, "note": ""}
response = requests.post(url, auth=auth, headers=headers, json=payload)
if response.status_code == 200 or response.status_code == 201:
logger.info(f"Successfully triggered Airflow DAG {dag_id}: {response.json()}")
else:
logger.error(f"Failed to trigger Airflow DAG {dag_id}: {response.text}")


@event_parser(model=S3Model, envelope=envelopes.SnsSqsEnvelope)
def lambda_handler(event: List[S3Model], context: LambdaContext) -> dict:
try:
object_key = unquote_plus(event[0].Records[0].s3.object.key)
bucket_name = unquote_plus(event[0].Records[0].s3.bucket.name)
logger.info(f"Source bucket: {bucket_name}, Source key: {object_key}")
dag_id = "sbg-l1-to-l2-e2e-cwl-step-by-step-dag"
trigger_airflow_dag(dag_id)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return {"statusCode": 500, "body": "An unexpected error occurred: " + str(e)}

return {"statusCode": 200, "body": "Success!"}
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ test = [
"apache-airflow-providers-cncf-kubernetes==8.0.0"
]
experiment = []
lambda-airflow-dag-trigger = [
"requests==2.31.0",
"aws-lambda-powertools[parser]==2.36.0",
]

[tool.setuptools.packages.find]
exclude = ["tests*"]
Expand Down
19 changes: 19 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,43 @@ No modules.

| Name | Type |
|------|------|
| [aws_cloudwatch_log_group.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/cloudwatch_log_group) | resource |
| [aws_db_instance.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_instance) | resource |
| [aws_db_subnet_group.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/db_subnet_group) | resource |
| [aws_efs_access_point.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_access_point) | resource |
| [aws_efs_file_system.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_file_system) | resource |
| [aws_efs_mount_target.airflow_kpo](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/efs_mount_target) | resource |
| [aws_iam_policy.airflow_worker_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource |
| [aws_iam_policy.lambda_sqs_access](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_policy) | resource |
| [aws_iam_role.airflow_worker_role](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource |
| [aws_iam_role.lambda](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.airflow_worker_policy_attachment](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.lambda_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.lambda_sqs_access_attach](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/iam_role_policy_attachment) | resource |
| [aws_lambda_event_source_mapping.lambda_airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_event_source_mapping) | resource |
| [aws_lambda_function.airflow_dag_trigger](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/lambda_function) | resource |
| [aws_s3_bucket.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource |
| [aws_s3_bucket.inbound_staging_location](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource |
| [aws_s3_bucket.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket) | resource |
| [aws_s3_bucket_notification.isl_bucket_notification](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_bucket_notification) | resource |
| [aws_s3_object.lambdas](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/s3_object) | resource |
| [aws_secretsmanager_secret.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret) | resource |
| [aws_secretsmanager_secret_version.airflow_db](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/secretsmanager_secret_version) | resource |
| [aws_security_group.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource |
| [aws_security_group.rds_sg](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group) | resource |
| [aws_security_group_rule.airflow_kpo_efs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [aws_security_group_rule.eks_egress_to_rds](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [aws_security_group_rule.rds_ingress_from_eks](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/security_group_rule) | resource |
| [aws_sns_topic.s3_isl_event_topic](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic) | resource |
| [aws_sns_topic_policy.s3_isl_event_topic_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_policy) | resource |
| [aws_sns_topic_subscription.s3_isl_event_subscription](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sns_topic_subscription) | resource |
| [aws_sqs_queue.s3_isl_event_queue](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue) | resource |
| [aws_sqs_queue_policy.s3_isl_event_queue_policy](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/sqs_queue_policy) | resource |
| [aws_ssm_parameter.airflow_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [aws_ssm_parameter.airflow_dag_trigger_lambda_package](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [aws_ssm_parameter.airflow_logs](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [aws_ssm_parameter.airflow_ui_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [aws_ssm_parameter.isl_bucket](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [aws_ssm_parameter.ogc_processes_api_url](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/resources/ssm_parameter) | resource |
| [helm_release.airflow](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource |
| [helm_release.keda](https://registry.terraform.io/providers/hashicorp/helm/2.12.1/docs/resources/release) | resource |
Expand All @@ -65,6 +83,7 @@ No modules.
| [kubernetes_secret.airflow_webserver](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/secret) | resource |
| [kubernetes_service.ogc_processes_api](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/service) | resource |
| [kubernetes_storage_class.efs](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/resources/storage_class) | resource |
| [null_resource.build_lambda_packages](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource |
| [null_resource.remove_finalizers](https://registry.terraform.io/providers/hashicorp/null/3.2.2/docs/resources/resource) | resource |
| [random_id.airflow_webserver_secret](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource |
| [random_id.counter](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/id) | resource |
Expand Down
Loading

0 comments on commit 9dedc43

Please sign in to comment.