diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..5db28544 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,20 @@ +## Purpose + +- Clear, easy-to-understand sentences outlining the purpose of the PR + +## Proposed Changes + +- [ADD] ... +- [CHANGE] ... +- [FIX] ... + +## Issues + +- Links to relevant issues +- Example: issue-XYZ + +## Testing + +- Provide some proof you've tested your changes +- Example: test results available at ... +- Example: tested on operating system ... diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml new file mode 100644 index 00000000..1ed54dbb --- /dev/null +++ b/airflow/dags/docker_cwl_pod.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Pod +metadata: + name: docker-cwl-pod +spec: + restartPolicy: Never + serviceAccountName: airflow-worker + + containers: + - name: cwl-docker + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:development-tag2 + command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] + securityContext: + privileged: true diff --git a/airflow/dags/sbg_preprocess_cwl_dag.py b/airflow/dags/sbg_preprocess_cwl_dag.py new file mode 100644 index 00000000..2d46ef47 --- /dev/null +++ b/airflow/dags/sbg_preprocess_cwl_dag.py @@ -0,0 +1,91 @@ +# DAG for executing the SBG Preprocess Workflow +# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl +import json +import uuid +from datetime import datetime + +from airflow.models.param import Param +from airflow.operators.python import PythonOperator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from kubernetes.client import models as k8s + +from airflow import DAG + +# The Kubernetes Pod that executes the CWL-Docker container +# Must use elevated privileges to start/stop the Docker engine +POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml" + +# The Kubernetes namespace within which the Pod is run (it must already exist) +POD_NAMESPACE = "airflow" + + +# Default DAG configuration +dag_default_args = { + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime.utcfromtimestamp(0), +} +CWL_WORKFLOW = ( + "https://raw.githubusercontent.com/unity-sds/sbg-workflows/1.0/preprocess/sbg-preprocess-workflow.cwl" +) + +dag = DAG( + dag_id="sbg-preprocess-cwl-dag", + description="SBG Preprocess Workflow as CWL", + tags=["SBG", "Unity", "SPS", "NASA", "JPL"], + is_paused_upon_creation=False, + catchup=False, + schedule=None, + max_active_runs=1, + default_args=dag_default_args, + params={ + "cwl_workflow": Param(CWL_WORKFLOW, type="string"), + # "input_processing_labels": Param(["label1", "label2"], type="string[]"), + "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), + "input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"), + "input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"), + "input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"), + "input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"), + "input_crid": Param("001", type="string"), + "output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"), + "output_data_bucket": Param("sps-dev-ds-storage", type="string"), + }, +) + + +# Task that serializes the job arguments into a JSON string +def setup(ti=None, **context): + task_dict = { + "input_processing_labels": ["label1", "label2"], + "input_cmr_collection_name": context["params"]["input_cmr_collection_name"], + "input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"], + "input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"], + "input_unity_dapa_api": context["params"]["input_unity_dapa_api"], + "input_unity_dapa_client": context["params"]["input_unity_dapa_client"], + "input_crid": context["params"]["input_crid"], + "output_collection_id": context["params"]["output_collection_id"], + "output_data_bucket": context["params"]["output_data_bucket"], + } + ti.xcom_push(key="cwl_args", value=json.dumps(task_dict)) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + + +# Task that executes the specific CWL workflow with the previous arguments +cwl_task = KubernetesPodOperator( + namespace=POD_NAMESPACE, + name="SBG_Preprocess_CWL", + on_finish_action="delete_pod", + hostnetwork=False, + startup_timeout_seconds=1000, + get_logs=True, + task_id="SBG_Preprocess_CWL", + full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))), + pod_template_file=POD_TEMPLATE_FILE, + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"], + # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, + dag=dag, +) + +setup_task >> cwl_task diff --git a/airflow/helm/values.tmpl.yaml b/airflow/helm/values.tmpl.yaml index 4443fb6c..52ce8abb 100644 --- a/airflow/helm/values.tmpl.yaml +++ b/airflow/helm/values.tmpl.yaml @@ -85,6 +85,9 @@ workers: - type: Percent value: 100 periodSeconds: 5 + serviceAccount: + annotations: + eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}" data: metadataSecretName: ${metadata_secret_name} diff --git a/pyproject.toml b/pyproject.toml index 1f07df4d..ce9e840d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,8 @@ test = [ "requests==2.31.0", "apache-airflow==2.8.1", "kubernetes==29.0.0", - "boto3==1.34.46" + "boto3==1.34.46", + "apache-airflow-providers-cncf-kubernetes==8.0.0" ] experiment = [] diff --git a/terraform-unity/README.md b/terraform-unity/README.md index 98ea24eb..c7a0c872 100644 --- a/terraform-unity/README.md +++ b/terraform-unity/README.md @@ -177,7 +177,7 @@ No resources. |------|-------------|------|---------|:--------:| | [airflow\_webserver\_password](#input\_airflow\_webserver\_password) | value | `string` | n/a | yes | | [counter](#input\_counter) | value | `string` | `""` | no | -| [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. |
object({
name = string
tag = string
})
|
{
"name": "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow",
"tag": "develop"
}
| no | +| [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. |
object({
name = string
tag = string
})
|
{
"name": "ghcr.io/unity-sds/unity-sps/sps-airflow",
"tag": "develop"
}
| no | | [eks\_cluster\_name](#input\_eks\_cluster\_name) | The name of the EKS cluster. | `string` | n/a | yes | | [helm\_charts](#input\_helm\_charts) | Settings for the required Helm charts. |
map(object({
repository = string
chart = string
version = string
}))
|
{
"airflow": {
"chart": "airflow",
"repository": "https://airflow.apache.org",
"version": "1.11.0"
},
"keda": {
"chart": "keda",
"repository": "https://kedacore.github.io/charts",
"version": "v2.13.1"
}
}
| no | | [kubeconfig\_filepath](#input\_kubeconfig\_filepath) | Path to the kubeconfig file for the Kubernetes cluster | `string` | `"../k8s/kubernetes.yml"` | no | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index 8155c954..9a760b33 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -53,6 +53,7 @@ No modules. | [random_password.airflow_db](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/password) | resource | | [aws_eks_cluster.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster) | data source | | [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster_auth) | data source | +| [aws_eks_node_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_node_group) | data source | | [aws_security_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/security_group) | data source | | [aws_ssm_parameter.subnet_ids](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/ssm_parameter) | data source | | [kubernetes_ingress_v1.airflow_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source | diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/data.tf b/terraform-unity/modules/terraform-unity-sps-airflow/data.tf index 7fc417b5..9e780db5 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/data.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/data.tf @@ -16,3 +16,8 @@ data "kubernetes_ingress_v1" "airflow_ingress" { namespace = kubernetes_namespace.airflow.metadata[0].name } } + +data "aws_eks_node_group" "default" { + cluster_name = var.eks_cluster_name + node_group_name = "defaultGroup" +} diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index 316a6b5a..5e5953ce 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -62,7 +62,6 @@ resource "kubernetes_secret" "airflow_webserver" { } } - resource "kubernetes_role" "airflow_pod_creator" { metadata { name = "airflow-pod-creator" @@ -229,6 +228,7 @@ resource "helm_release" "airflow" { metadata_secret_name = "airflow-metadata-secret" webserver_secret_name = "airflow-webserver-secret" airflow_logs_s3_location = "s3://${aws_s3_bucket.airflow_logs.id}" + airflow_worker_role_arn = data.aws_eks_node_group.default.node_role_arn }) ] set_sensitive { diff --git a/terraform-unity/variables.tf b/terraform-unity/variables.tf index 64aa78ab..b4b30992 100644 --- a/terraform-unity/variables.tf +++ b/terraform-unity/variables.tf @@ -71,7 +71,7 @@ variable "custom_airflow_docker_image" { tag = string }) default = { - name = "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow" + name = "ghcr.io/unity-sds/unity-sps/sps-airflow" tag = "develop" } }