|
| 1 | +# DAG for executing the SBG Preprocess Workflow |
| 2 | +# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl |
| 3 | +import json |
| 4 | +import uuid |
| 5 | +from datetime import datetime |
| 6 | + |
| 7 | +from airflow.models.param import Param |
| 8 | +from airflow.operators.python import PythonOperator |
| 9 | +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator |
| 10 | +from kubernetes.client import models as k8s |
| 11 | + |
| 12 | +from airflow import DAG |
| 13 | + |
| 14 | +# The Kubernetes Pod that executes the CWL-Docker container |
| 15 | +# Must use elevated privileges to start/stop the Docker engine |
| 16 | +POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml" |
| 17 | + |
| 18 | +# The Kubernetes namespace within which the Pod is run (it must already exist) |
| 19 | +POD_NAMESPACE = "airflow" |
| 20 | + |
| 21 | + |
| 22 | +# Default DAG configuration |
| 23 | +dag_default_args = {"owner": "airflow", "depends_on_past": False, "start_date": datetime(2024, 1, 1, 0, 0)} |
| 24 | +CWL_WORKFLOW = ( |
| 25 | + "https://raw.githubusercontent.com/unity-sds/sbg-workflows/1.0/preprocess/sbg-preprocess-workflow.cwl" |
| 26 | +) |
| 27 | + |
| 28 | +dag = DAG( |
| 29 | + dag_id="sbg-preprocess-cwl-dag", |
| 30 | + description="SBG Preprocess Workflow as CWL", |
| 31 | + tags=["SBG", "Unity", "SPS", "NASA", "JPL"], |
| 32 | + is_paused_upon_creation=True, |
| 33 | + catchup=False, |
| 34 | + schedule=None, |
| 35 | + max_active_runs=1, |
| 36 | + default_args=dag_default_args, |
| 37 | + params={ |
| 38 | + "cwl_workflow": Param(CWL_WORKFLOW, type="string"), |
| 39 | + # "input_processing_labels": Param(["label1", "label2"], type="string[]"), |
| 40 | + "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"), |
| 41 | + "input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"), |
| 42 | + "input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"), |
| 43 | + "input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"), |
| 44 | + "input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"), |
| 45 | + "input_crid": Param("001", type="string"), |
| 46 | + "output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"), |
| 47 | + "output_data_bucket": Param("sps-dev-ds-storage", type="string"), |
| 48 | + }, |
| 49 | +) |
| 50 | + |
| 51 | + |
| 52 | +# Task that serializes the job arguments into a JSON string |
| 53 | +def setup(ti=None, **context): |
| 54 | + task_dict = { |
| 55 | + "input_processing_labels": ["label1", "label2"], |
| 56 | + "input_cmr_collection_name": context["params"]["input_cmr_collection_name"], |
| 57 | + "input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"], |
| 58 | + "input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"], |
| 59 | + "input_unity_dapa_api": context["params"]["input_unity_dapa_api"], |
| 60 | + "input_unity_dapa_client": context["params"]["input_unity_dapa_client"], |
| 61 | + "input_crid": context["params"]["input_crid"], |
| 62 | + "output_collection_id": context["params"]["output_collection_id"], |
| 63 | + "output_data_bucket": context["params"]["output_data_bucket"], |
| 64 | + } |
| 65 | + ti.xcom_push(key="cwl_args", value=json.dumps(task_dict)) |
| 66 | + |
| 67 | + |
| 68 | +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) |
| 69 | + |
| 70 | + |
| 71 | +# Task that executes the specific CWL workflow with the previous arguments |
| 72 | +cwl_task = KubernetesPodOperator( |
| 73 | + namespace=POD_NAMESPACE, |
| 74 | + name="SBG_Preprocess_CWL", |
| 75 | + on_finish_action="delete_pod", |
| 76 | + hostnetwork=False, |
| 77 | + startup_timeout_seconds=1000, |
| 78 | + get_logs=True, |
| 79 | + task_id="SBG_Preprocess_CWL", |
| 80 | + full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))), |
| 81 | + pod_template_file=POD_TEMPLATE_FILE, |
| 82 | + arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"], |
| 83 | + # resources={"request_memory": "512Mi", "limit_memory": "1024Mi"}, |
| 84 | + dag=dag, |
| 85 | +) |
| 86 | + |
| 87 | +setup_task >> cwl_task |
0 commit comments