From 03d45d6c6d3e80612bdca648ef534b942909a5d1 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Tue, 4 Feb 2025 13:42:44 -0700 Subject: [PATCH] Fixing the Pod label key --- airflow/dags/cwl_dag.py | 11 ++++++----- utils/trigger_dag.py | 17 +++++++++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index d8184a33..5df044c7 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -26,8 +26,9 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "sps" -# unique pod label to assure each jkob runs on its own pod -POD_LABEL = "cwl_task" + datetime.now().strftime("%Y%m%d_%H%M%S_%f") +# Note: each Pod is assigned the same label to assure that (via the anti-affinity requirements) +# two Pods with the same label cannot run on the same Node +POD_LABEL = "cwl_task" SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.5.1" NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator" NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload" @@ -143,8 +144,8 @@ def build_ec2_type_label(key): is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=10, - max_active_tasks=30, + max_active_runs=100, + max_active_tasks=300, default_args=dag_default_args, params={ "cwl_workflow": Param( @@ -253,7 +254,7 @@ def setup(ti=None, **context): "karpenter.sh/nodepool": "{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}", "node.kubernetes.io/instance-type": "{{ti.xcom_pull(task_ids='Setup', key='instance_type')}}", }, - labels={"app": POD_LABEL}, + labels={"pod": POD_LABEL}, annotations={"karpenter.sh/do-not-disrupt": "true"}, # note: 'affinity' cannot yet be templated affinity=get_affinity( diff --git a/utils/trigger_dag.py b/utils/trigger_dag.py index d773f739..263eb9fe 100644 --- a/utils/trigger_dag.py +++ b/utils/trigger_dag.py @@ -41,10 +41,19 @@ def main(): dt_now = datetime.now(timezone.utc) logical_date = dt_now.strftime("%Y-%m-%dT%H:%M:%SZ") data = {"logical_date": logical_date} - # Example on how to pass DAG specific parameters - # data = {"logical_date": logical_date, - # "conf": {"cwl_args": "abc123"} - # } + # Example on how to pass DAG specific parameters for the cwl_dag + # data = { + # "logical_date": logical_date, + # "conf": { + # "request_instance_type": "r7i.xlarge", + # "cwl_workflow": "http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/" + # "api/ga4gh/trs/v2/tools/%23workflow%2Fdockstore.org%2FGodwinShen%2Femit-ghg/" + # "versions/9/plain-CWL/descriptor/workflow.cwl", + # "cwl_args": "https://raw.githubusercontent.com/GodwinShen/emit-ghg/refs/heads/main" + # "/test/emit-ghg-dev.json", + # "request_storage": "100Gi" + # } + # } result = requests.post( url, json=data, headers=headers, auth=HTTPBasicAuth(airflow_username, airflow_password) )