Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

259 cwl #7

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Purpose

- Clear, easy-to-understand sentences outlining the purpose of the PR

## Proposed Changes

- [ADD] ...
- [CHANGE] ...
- [FIX] ...

## Issues

- Links to relevant issues
- Example: issue-XYZ

## Testing

- Provide some proof you've tested your changes
- Example: test results available at ...
- Example: tested on operating system ...
14 changes: 14 additions & 0 deletions airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Pod
metadata:
name: docker-cwl-pod
spec:
restartPolicy: Never
serviceAccountName: airflow-worker

containers:
- name: cwl-docker
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:development-tag2
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
privileged: true
91 changes: 91 additions & 0 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# DAG for executing the SBG Preprocess Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl
import json
import uuid
from datetime import datetime

from airflow.models.param import Param
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

from airflow import DAG

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"


# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/1.0/preprocess/sbg-preprocess-workflow.cwl"
)

dag = DAG(
dag_id="sbg-preprocess-cwl-dag",
description="SBG Preprocess Workflow as CWL",
tags=["SBG", "Unity", "SPS", "NASA", "JPL"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
# "input_processing_labels": Param(["label1", "label2"], type="string[]"),
"input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
"input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"input_crid": Param("001", type="string"),
"output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),
},
)


# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_collection_name": context["params"]["input_cmr_collection_name"],
"input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"],
"input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"],
"input_unity_dapa_api": context["params"]["input_unity_dapa_api"],
"input_unity_dapa_client": context["params"]["input_unity_dapa_client"],
"input_crid": context["params"]["input_crid"],
"output_collection_id": context["params"]["output_collection_id"],
"output_data_bucket": context["params"]["output_data_bucket"],
}
ti.xcom_push(key="cwl_args", value=json.dumps(task_dict))


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


# Task that executes the specific CWL workflow with the previous arguments
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="SBG_Preprocess_CWL",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag,
)

setup_task >> cwl_task
3 changes: 3 additions & 0 deletions airflow/helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ workers:
- type: Percent
value: 100
periodSeconds: 5
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}"

data:
metadataSecretName: ${metadata_secret_name}
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ test = [
"requests==2.31.0",
"apache-airflow==2.8.1",
"kubernetes==29.0.0",
"boto3==1.34.46"
"boto3==1.34.46",
"apache-airflow-providers-cncf-kubernetes==8.0.0"
]
experiment = []

Expand Down
2 changes: 1 addition & 1 deletion terraform-unity/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ No resources.
|------|-------------|------|---------|:--------:|
| <a name="input_airflow_webserver_password"></a> [airflow\_webserver\_password](#input\_airflow\_webserver\_password) | value | `string` | n/a | yes |
| <a name="input_counter"></a> [counter](#input\_counter) | value | `string` | `""` | no |
| <a name="input_custom_airflow_docker_image"></a> [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. | <pre>object({<br> name = string<br> tag = string<br> })</pre> | <pre>{<br> "name": "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow",<br> "tag": "develop"<br>}</pre> | no |
| <a name="input_custom_airflow_docker_image"></a> [custom\_airflow\_docker\_image](#input\_custom\_airflow\_docker\_image) | Docker image for the customized Airflow image. | <pre>object({<br> name = string<br> tag = string<br> })</pre> | <pre>{<br> "name": "ghcr.io/unity-sds/unity-sps/sps-airflow",<br> "tag": "develop"<br>}</pre> | no |
| <a name="input_eks_cluster_name"></a> [eks\_cluster\_name](#input\_eks\_cluster\_name) | The name of the EKS cluster. | `string` | n/a | yes |
| <a name="input_helm_charts"></a> [helm\_charts](#input\_helm\_charts) | Settings for the required Helm charts. | <pre>map(object({<br> repository = string<br> chart = string<br> version = string<br> }))</pre> | <pre>{<br> "airflow": {<br> "chart": "airflow",<br> "repository": "https://airflow.apache.org",<br> "version": "1.11.0"<br> },<br> "keda": {<br> "chart": "keda",<br> "repository": "https://kedacore.github.io/charts",<br> "version": "v2.13.1"<br> }<br>}</pre> | no |
| <a name="input_kubeconfig_filepath"></a> [kubeconfig\_filepath](#input\_kubeconfig\_filepath) | Path to the kubeconfig file for the Kubernetes cluster | `string` | `"../k8s/kubernetes.yml"` | no |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ No modules.
| [random_password.airflow_db](https://registry.terraform.io/providers/hashicorp/random/3.6.0/docs/resources/password) | resource |
| [aws_eks_cluster.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster) | data source |
| [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_cluster_auth) | data source |
| [aws_eks_node_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/eks_node_group) | data source |
| [aws_security_group.default](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/security_group) | data source |
| [aws_ssm_parameter.subnet_ids](https://registry.terraform.io/providers/hashicorp/aws/5.35.0/docs/data-sources/ssm_parameter) | data source |
| [kubernetes_ingress_v1.airflow_ingress](https://registry.terraform.io/providers/hashicorp/kubernetes/2.25.2/docs/data-sources/ingress_v1) | data source |
Expand Down
5 changes: 5 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ data "kubernetes_ingress_v1" "airflow_ingress" {
namespace = kubernetes_namespace.airflow.metadata[0].name
}
}

data "aws_eks_node_group" "default" {
cluster_name = var.eks_cluster_name
node_group_name = "defaultGroup"
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ resource "kubernetes_secret" "airflow_webserver" {
}
}


resource "kubernetes_role" "airflow_pod_creator" {
metadata {
name = "airflow-pod-creator"
Expand Down Expand Up @@ -229,6 +228,7 @@ resource "helm_release" "airflow" {
metadata_secret_name = "airflow-metadata-secret"
webserver_secret_name = "airflow-webserver-secret"
airflow_logs_s3_location = "s3://${aws_s3_bucket.airflow_logs.id}"
airflow_worker_role_arn = data.aws_eks_node_group.default.node_role_arn
})
]
set_sensitive {
Expand Down
2 changes: 1 addition & 1 deletion terraform-unity/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ variable "custom_airflow_docker_image" {
tag = string
})
default = {
name = "ghcr.io/unity-sds/unity-sps-prototype/sps-airflow"
name = "ghcr.io/unity-sds/unity-sps/sps-airflow"
tag = "develop"
}
}
Loading