diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f7759757..8ce93b0e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,29 +22,29 @@ repos: - id: check-toml # Checks toml files for parsable syntax. - repo: https://github.com/igorshubovych/markdownlint-cli - rev: "v0.39.0" + rev: "v0.44.0" hooks: - id: markdownlint args: ["--config", ".markdownlintrc", "--ignore", "CHANGELOG.md"] - repo: https://github.com/PyCQA/isort - rev: 5.13.2 + rev: 6.0.1 hooks: - id: isort args: ["--profile=black"] - repo: https://github.com/psf/black-pre-commit-mirror - rev: 24.4.2 + rev: 25.1.0 hooks: - id: black - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.4.2 + rev: v0.11.9 hooks: - id: ruff - repo: https://github.com/PyCQA/bandit - rev: "1.7.8" # you must change this to newest version + rev: "1.8.3" # you must change this to newest version hooks: - id: bandit args: @@ -56,7 +56,7 @@ repos: additional_dependencies: [".[toml]"] - repo: https://github.com/hadolint/hadolint - rev: v2.13.0-beta + rev: v2.13.1-beta hooks: - id: hadolint # requires hadolint is installed (brew install hadolint) args: @@ -65,7 +65,7 @@ repos: - --verbose - repo: https://github.com/antonbabenko/pre-commit-terraform - rev: v1.89.1 + rev: v1.99.0 hooks: - id: terraform_validate # Validates all Terraform configuration files. args: diff --git a/airflow/dags/edrgen.py b/airflow/dags/edrgen.py index 063e9aac..50857e31 100644 --- a/airflow/dags/edrgen.py +++ b/airflow/dags/edrgen.py @@ -40,7 +40,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=103) def prep(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id @@ -91,10 +91,13 @@ def prep(params: dict): prep_task = prep() edrgen_task = KubernetesPodOperator( + weight_rule="absolute", + priority_weight=104, task_id="edrgen", name="edrgen", namespace="sps", - image="pymonger/srl-idps-edrgen:develop", + image="429178552491.dkr.ecr.us-west-2.amazonaws.com/srl-idps/edrgen:develop", + # image="pymonger/srl-idps-edrgen:develop", # cmds=[ # "sh", # "-c", @@ -109,7 +112,7 @@ def prep(params: dict): container_logs=True, service_account_name="airflow-worker", container_security_context={"privileged": True}, - retries=0, + retries=3, volume_mounts=[ k8s.V1VolumeMount( name="workers-volume", mount_path="/stage-in", sub_path="{{ dag_run.run_id }}/stage-in" @@ -126,16 +129,16 @@ def prep(params: dict): ], node_selector={ "karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_HIGH_WORKLOAD, - "node.kubernetes.io/instance-type": "r7i.2xlarge", + "node.kubernetes.io/instance-type": "c6i.large", }, labels={"pod": unity_sps_utils.POD_LABEL}, annotations={"karpenter.sh/do-not-disrupt": "true"}, affinity=unity_sps_utils.get_affinity( - capacity_type=["on-demand"], anti_affinity_label=unity_sps_utils.POD_LABEL + capacity_type=["spot"], anti_affinity_label=unity_sps_utils.POD_LABEL ), ) - @task + @task(weight_rule="absolute", priority_weight=105) def post(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id diff --git a/airflow/dags/eval_srl_edrgen.py b/airflow/dags/eval_srl_edrgen.py index 1bd53b0c..a11e555f 100644 --- a/airflow/dags/eval_srl_edrgen.py +++ b/airflow/dags/eval_srl_edrgen.py @@ -32,7 +32,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=100) def evaluate_edrgen(params: dict): s3_hook = S3Hook() @@ -92,6 +92,8 @@ def edrgen_evaluation_successful(): edrgen_evaluation_successful_task = edrgen_evaluation_successful() trigger_edrgen_task = TriggerDagRunOperator( + weight_rule="absolute", + priority_weight=102, task_id="trigger_edrgen", trigger_dag_id="edrgen", # uncomment the next line if we want to dedup dagRuns for a particular ID diff --git a/airflow/dags/eval_srl_rdrgen.py b/airflow/dags/eval_srl_rdrgen.py index 6673d78e..373abbf4 100644 --- a/airflow/dags/eval_srl_rdrgen.py +++ b/airflow/dags/eval_srl_rdrgen.py @@ -32,7 +32,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=106) def evaluate_rdrgen(params: dict): s3_hook = S3Hook() @@ -69,7 +69,7 @@ def evaluate_rdrgen(params: dict): evaluate_rdrgen_task = evaluate_rdrgen() - @task.short_circuit() + @task.short_circuit(weight_rule="absolute", priority_weight=107) def rdrgen_evaluation_successful(): context = get_current_context() print(f"{context['ti'].xcom_pull(task_ids='evaluate_rdrgen')}") @@ -81,6 +81,8 @@ def rdrgen_evaluation_successful(): rdrgen_evaluation_successful_task = rdrgen_evaluation_successful() trigger_rdrgen_task = TriggerDagRunOperator( + weight_rule="absolute", + priority_weight=108, task_id="trigger_rdrgen", trigger_dag_id="rdrgen", # uncomment the next line if we want to dedup dagRuns for a particular ID diff --git a/airflow/dags/eval_srl_vic2png.py b/airflow/dags/eval_srl_vic2png.py index c1248f5a..719b9c00 100644 --- a/airflow/dags/eval_srl_vic2png.py +++ b/airflow/dags/eval_srl_vic2png.py @@ -32,7 +32,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=112) def evaluate_vic2png(params: dict): s3_hook = S3Hook() @@ -64,7 +64,7 @@ def evaluate_vic2png(params: dict): evaluate_vic2png_task = evaluate_vic2png() - @task.short_circuit() + @task.short_circuit(weight_rule="absolute", priority_weight=113) def vic2png_evaluation_successful(): context = get_current_context() print(f"{context['ti'].xcom_pull(task_ids='evaluate_vic2png')}") @@ -73,6 +73,8 @@ def vic2png_evaluation_successful(): vic2png_evaluation_successful_task = vic2png_evaluation_successful() trigger_vic2png_task = TriggerDagRunOperator( + weight_rule="absolute", + priority_weight=114, task_id="trigger_vic2png", trigger_dag_id="vic2png", # uncomment the next line if we want to dedup dagRuns for a particular ID diff --git a/airflow/dags/rdrgen.py b/airflow/dags/rdrgen.py index f005133c..43e4ec53 100644 --- a/airflow/dags/rdrgen.py +++ b/airflow/dags/rdrgen.py @@ -32,7 +32,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=109) def prep(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id @@ -81,7 +81,11 @@ def prep(params: dict): # cli_args = ["-c", f"select d && exec $MARSLIB/marsinverter {rdrgen['vic_url']} {output_vic_file}"] # KLUDGE: ignore non-zero exit code when no-op occurs - cli_args = ["-c", f"select d && $MARSLIB/marsinverter {rdrgen['vic_url']} {output_vic_file} || :"] + # cli_args = ["-c", f"select d && $MARSLIB/marsinverter {rdrgen['vic_url']} {output_vic_file} || :"] + cli_args = [ + "-c", + f"export LD_LIBRARY_PATH=/usr/local/vicar/external/xerces-c++/v3.0.0_rhel8/x86-64-linx/lib:/usr/local/vicar/dev/olb/x86-64-linx:/usr/local/vicar/external/embree/v3.7.0/x86-64-linx; /usr/local/vicar/dev/mars/lib/x86-64-linx/marsinverter {rdrgen['vic_url']} {output_vic_file} || :", + ] res = subprocess.run(["find", dag_run_dir], capture_output=True, text=True) print(res.stdout) print(res.stderr) @@ -91,11 +95,15 @@ def prep(params: dict): prep_task = prep() rdrgen = KubernetesPodOperator( + weight_rule="absolute", + priority_weight=110, task_id="rdrgen", name="rdrgen", namespace="sps", - image="pymonger/srl-idps-rdrgen:multiarch-test", - cmds=["/bin/tcsh"], + image="429178552491.dkr.ecr.us-west-2.amazonaws.com/srl-idps/rdrgen:develop-multiarch", + # image="pymonger/srl-idps-rdrgen:multiarch-test", + # cmds=["/bin/tcsh"], + cmds=["/bin/bash"], arguments=prep_task, do_xcom_push=True, on_finish_action="delete_pod", @@ -105,7 +113,7 @@ def prep(params: dict): container_logs=True, service_account_name="airflow-worker", container_security_context={"privileged": True}, - retries=0, + retries=3, volume_mounts=[ k8s.V1VolumeMount( name="workers-volume", mount_path="/stage-in", sub_path="{{ dag_run.run_id }}/stage-in" @@ -122,16 +130,16 @@ def prep(params: dict): ], node_selector={ "karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_HIGH_WORKLOAD, - "node.kubernetes.io/instance-type": "r7i.2xlarge", + "node.kubernetes.io/instance-type": "c6i.large", }, labels={"pod": unity_sps_utils.POD_LABEL}, annotations={"karpenter.sh/do-not-disrupt": "true"}, affinity=unity_sps_utils.get_affinity( - capacity_type=["on-demand"], anti_affinity_label=unity_sps_utils.POD_LABEL + capacity_type=["spot"], anti_affinity_label=unity_sps_utils.POD_LABEL ), ) - @task + @task(weight_rule="absolute", priority_weight=111) def post(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id diff --git a/airflow/dags/router.py b/airflow/dags/router.py index cd7206f9..9933aacb 100644 --- a/airflow/dags/router.py +++ b/airflow/dags/router.py @@ -34,7 +34,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=200) def enumerate_evaluators(params: dict): payload = params["payload"] evaluators = [] @@ -49,7 +49,11 @@ def enumerate_evaluators(params: dict): enumerate_evals_task = enumerate_evaluators() trigger_eval_task = TriggerDagRunOperator.partial( - task_id="route_payload_to_evaluator", wait_for_completion=False, trigger_rule=TriggerRule.ALL_SUCCESS + weight_rule="absolute", + priority_weight=201, + task_id="route_payload_to_evaluator", + wait_for_completion=False, + trigger_rule=TriggerRule.ALL_SUCCESS, ).expand_kwargs(enumerate_evals_task) enumerate_evals_task >> trigger_eval_task diff --git a/airflow/dags/vic2png.py b/airflow/dags/vic2png.py index 78661d8a..1473f87e 100644 --- a/airflow/dags/vic2png.py +++ b/airflow/dags/vic2png.py @@ -32,7 +32,7 @@ }, ) as dag: - @task + @task(weight_rule="absolute", priority_weight=115) def prep(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id @@ -77,10 +77,13 @@ def prep(params: dict): prep_task = prep() vic2png_task = KubernetesPodOperator( + weight_rule="absolute", + priority_weight=116, task_id="vic2png", name="vic2png", namespace="sps", - image="pymonger/srl-idps-vic2png:develop", + image="429178552491.dkr.ecr.us-west-2.amazonaws.com/srl-idps/vic2png:develop", + # image="pymonger/srl-idps-vic2png:develop", # cmds=[ # "sh", # "-c", @@ -95,7 +98,7 @@ def prep(params: dict): container_logs=True, service_account_name="airflow-worker", container_security_context={"privileged": True}, - retries=0, + retries=3, volume_mounts=[ k8s.V1VolumeMount( name="workers-volume", mount_path="/stage-in", sub_path="{{ dag_run.run_id }}/stage-in" @@ -112,16 +115,16 @@ def prep(params: dict): ], node_selector={ "karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_HIGH_WORKLOAD, - "node.kubernetes.io/instance-type": "r7i.2xlarge", + "node.kubernetes.io/instance-type": "c6i.large", }, labels={"pod": unity_sps_utils.POD_LABEL}, annotations={"karpenter.sh/do-not-disrupt": "true"}, affinity=unity_sps_utils.get_affinity( - capacity_type=["on-demand"], anti_affinity_label=unity_sps_utils.POD_LABEL + capacity_type=["spot"], anti_affinity_label=unity_sps_utils.POD_LABEL ), ) - @task + @task(weight_rule="absolute", priority_weight=117) def post(params: dict): context = get_current_context() dag_run_id = context["dag_run"].run_id diff --git a/airflow/helm/values_high_load.tmpl.yaml b/airflow/helm/values_high_load.tmpl.yaml new file mode 100644 index 00000000..041d86cb --- /dev/null +++ b/airflow/helm/values_high_load.tmpl.yaml @@ -0,0 +1,382 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +# Source of default values: https://github.com/apache/airflow/blob/main/chart/values.yaml + +# Airflow create user job settings +createUserJob: + # In case you need to disable the helm hooks that create the jobs after install. + # Disable this if you are using ArgoCD for example + useHelmHooks: false + applyCustomEnv: false + +# Airflow database migration job settings +migrateDatabaseJob: + # In case you need to disable the helm hooks that create the jobs after install. + # Disable this if you are using ArgoCD for example + useHelmHooks: false + applyCustomEnv: false + # To run database migrations with Argo CD automatically, you will need to add the + # following. This will run database migrations every time there is a Sync event + # in Argo CD. While it is not ideal to run the migrations on every sync, it is a + # trade-off that allows them to be run automatically. + jobAnnotations: + "argocd.argoproj.io/hook": Sync + +images: + airflow: + repository: ${airflow_image_repo} + tag: ${airflow_image_tag} + +# Global default settings for Airflow pods +nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + +affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: ["on-demand"] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + values: ["c6i", "c5"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: ["2", "4"] + +topologySpreadConstraints: +- maxSkew: 1 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiable: "ScheduleAnyway" + labelSelector: + matchLabels: + app: airflow # This label should match all Airflow pods +- maxSkew: 1 + topologyKey: "kubernetes.io/hostname" + whenUnsatisfiable: "ScheduleAnyway" + labelSelector: + matchLabels: + app: airflow + +# Add common labels to all objects and pods defined in this chart. +labels: + app: airflow + +scheduler: + replicas: 8 + nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: ["on-demand"] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: ["r5"] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: ["16", "32", "64"] + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiable: "ScheduleAnyway" + labelSelector: + matchLabels: + component: scheduler + - maxSkew: 1 + topologyKey: "kubernetes.io/hostname" + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + component: scheduler + labels: + component: scheduler + +triggerer: + keda: + enabled: true + minReplicaCount: 1 + nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: [ "on-demand" ] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: [ "r5" ] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: [ "16", "32", "64" ] # Scheduler might benefit from higher CPU + +postgresql: + enabled: false + +pgbouncer: + enabled: true + replicas: 3 + nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: [ "on-demand" ] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: [ "r5" ] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: [ "16", "32", "64" ] # Scheduler might benefit from higher CPU + +webserverSecretKeySecretName: ${webserver_secret_name} + +webserver: + replicas: 3 + + # Issue 404: DISABLE AIRRLOW AUTHENTICATION (https://github.com/unity-sds/unity-sps/issues/404) + webserverConfig: |- + ${webserver_config} + + startupProbe: + timeoutSeconds: 20 + failureThreshold: 60 # Number of tries before giving up (10 minutes with periodSeconds of 10) + periodSeconds: 10 # How often to perform the probe + + nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: ["on-demand"] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: ["r5"] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: ["16", "32", "64"] # Balancing between CPU and memory + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiable: "ScheduleAnyway" + labelSelector: + matchLabels: + component: webserver + - maxSkew: 1 + topologyKey: "kubernetes.io/hostname" + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + component: webserver + labels: + component: webserver + +workers: + nodeSelector: + "karpenter.sh/nodepool": "airflow-celery-workers" + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: ["spot"] + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: ["r5"] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: ["16", "32", "64"] + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiable: "ScheduleAnyway" + labelSelector: + matchLabels: + component: worker + - maxSkew: 1 + topologyKey: "kubernetes.io/hostname" + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + component: worker + labels: + component: worker + + keda: + enabled: true + pollingInterval: 1 + minReplicaCount: 1 + maxReplicaCount: 128 + # Specify HPA related options + # https://github.com/kubernetes/enhancements/blob/master/keps/sig-autoscaling/853-configurable-hpa-scale-velocity/README.md + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + policies: + - type: Percent + value: 900 + periodSeconds: 30 + scaleDown: + stabilizationWindowSeconds: 300 + policies: + - type: Percent + value: 100 + periodSeconds: 5 + serviceAccount: + annotations: + eks.amazonaws.com/role-arn: "${airflow_worker_role_arn}" + extraVolumes: + - name: workers-volume + persistentVolumeClaim: + claimName: ${workers_pvc_name} + extraVolumeMounts: + - name: workers-volume + mountPath: /shared-task-data + readOnly: false + +data: + metadataSecretName: ${metadata_secret_name} + resultBackendSecretName: ~ + +config: + logging: + remote_logging: 'True' + logging_level: "INFO" + remote_base_log_folder: ${airflow_logs_s3_location} + remote_log_conn_id: "aws_default" + encrypt_s3_logs: 'False' + celery: + worker_concurrency: 64 + webserver: + enable_proxy_fix: 'True' + +dags: + persistence: + # Enable persistent volume for storing dags + enabled: true + # the name of an existing PVC to use + existingClaim: ${dags_pvc_name} + +dagProcessor: + enabled: true + replicas: 3 + nodeSelector: + "karpenter.sh/nodepool": "airflow-core-components" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "karpenter.sh/capacity-type" + operator: "In" + values: [ "on-demand" ] + - key: "karpenter.k8s.aws/instance-family" + operator: "In" + # values: ["c6i", "c5"] # Choosing compute-optimized instances + values: [ "r5" ] # Choosing memory-optimized instance + - key: "karpenter.k8s.aws/instance-cpu" + operator: "In" + values: [ "16", "32", "64" ] # Scheduler might benefit from higher CPU + +env: + - name: "AIRFLOW_VAR_KUBERNETES_PIPELINE_NAMESPACE" + value: "${kubernetes_namespace}" + - name: "AIRFLOW_VAR_UNITY_PROJECT" + value: "${unity_project}" + - name: "AIRFLOW_VAR_UNITY_VENUE" + value: "${unity_venue}" + - name: "AIRFLOW_VAR_UNITY_CLUSTER_NAME" + value: "${unity_cluster_name}" + - name: "AIRFLOW_VAR_KARPENTER_NODE_POOLS" + value: "${karpenter_node_pools}" + - name: "AIRFLOW_VAR_ECR_URI" + value: "${cwl_dag_ecr_uri}" + +# https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/security/api.html +extraEnv: | + - name: AIRFLOW__CORE__DAGS_FOLDER + value: "/opt/airflow/dags" + - name: AIRFLOW__CORE__PLUGINS_FOLDER + value: "/opt/airflow/plugins" + - name: AIRFLOW__CORE__LAZY_LOAD_PLUGINS + value: "False" + - name: AIRFLOW__API__AUTH_BACKENDS + value: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" + - name: AIRFLOW__CORE__PARALLELISM + value: "32768" + - name: AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG + value: "10000" + - name: AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG + value: "10000" + - name: AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP + value: "1000" + - name: AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE + value: "1000" + - name: AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY + value: "0" + - name: AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC + value: "1" + - name: AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT + value: "7200" + - name: AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE + value: "1000" + - name: AIRFLOW__WEBSERVER__NAVBAR_COLOR + value: "${webserver_navbar_color}" + - name: AIRFLOW__WEBSERVER__INSTANCE_NAME + value: "Deployment: ${webserver_instance_name}, ${service_area} Version: ${service_area_version}" + - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL + value: "10" + - name: AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL + value: "0" + - name: AIRFLOW__WEBSERVER__EXPOSE_CONFIG + value: "True" + - name: AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT + value: "32768" + #- name: AIRFLOW__CELERY__WORKER_AUTOSCALE + # value: "64,56" + #- name: AIRFLOW__CELERY__WORKER_CONCURRENCY + # value: "64" diff --git a/terraform-unity/README.md b/terraform-unity/README.md index 22078a34..8870beb8 100644 --- a/terraform-unity/README.md +++ b/terraform-unity/README.md @@ -23,44 +23,72 @@ terraform-docs tfvars hcl . --output-file "terraform.tfvars" ```json -celeryconfig_filename = "celeryconfig_remote.py" -counter = "" -datasets_filename = "datasets.remote.template.json" -deployment_environment = "mcp" -docker_images = { - "ades_wpst_api": "ghcr.io/unity-sds/unity-sps-prototype/ades-wpst-api:unity-v0.0.1", - "busybox": "k8s.gcr.io/busybox", - "hysds_core": "ghcr.io/unity-sds/unity-sps-prototype/hysds-core:unity-v0.0.1", - "hysds_factotum": "ghcr.io/unity-sds/unity-sps-prototype/hysds-factotum:unity-v0.0.1", - "hysds_grq2": "ghcr.io/unity-sds/unity-sps-prototype/hysds-grq2:unity-v0.0.1", - "hysds_mozart": "ghcr.io/unity-sds/unity-sps-prototype/hysds-mozart:unity-v0.0.1", - "hysds_ui": "ghcr.io/unity-sds/unity-sps-prototype/hysds-ui-remote:unity-v0.0.1", - "hysds_verdi": "ghcr.io/unity-sds/unity-sps-prototype/hysds-verdi:unity-v0.0.1", - "logstash": "docker.elastic.co/logstash/logstash:7.10.2", - "mc": "minio/mc:RELEASE.2022-03-13T22-34-00Z", - "minio": "minio/minio:RELEASE.2022-03-17T06-34-49Z", - "rabbitmq": "rabbitmq:3-management", - "redis": "redis:latest" -} -kubeconfig_filepath = "" -mozart_es = { - "volume_claim_template": { - "storage_class_name": "gp2-sps" - } -} -namespace = "" -node_port_map = { - "ades_wpst_api_service": 30011, - "grq2_es": 30012, - "grq2_service": 30002, - "hysds_ui_service": 30009, - "minio_service_api": 30007, - "minio_service_interface": 30008, - "mozart_es": 30013, - "mozart_service": 30001 -} -service_type = "LoadBalancer" -venue = "" +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | ~> 1.8.2 | +| [aws](#requirement\_aws) | 5.67.0 | +| [external](#requirement\_external) | 2.3.4 | +| [helm](#requirement\_helm) | 2.15.0 | +| [kubernetes](#requirement\_kubernetes) | 2.32.0 | +| [null](#requirement\_null) | 3.2.3 | +| [time](#requirement\_time) | 0.12.1 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | 5.67.0 | +| [kubernetes](#provider\_kubernetes) | 2.32.0 | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [unity-sps-airflow](#module\_unity-sps-airflow) | ./modules/terraform-unity-sps-airflow | n/a | +| [unity-sps-database](#module\_unity-sps-database) | ./modules/terraform-unity-sps-database | n/a | +| [unity-sps-efs](#module\_unity-sps-efs) | ./modules/terraform-unity-sps-efs | n/a | +| [unity-sps-karpenter-node-config](#module\_unity-sps-karpenter-node-config) | ./modules/terraform-unity-sps-karpenter-node-config | n/a | +| [unity-sps-ogc-processes-api](#module\_unity-sps-ogc-processes-api) | ./modules/terraform-unity-sps-ogc-processes-api | n/a | +| [unity-sps-s3](#module\_unity-sps-s3) | ./modules/terraform-unity-sps-s3 | n/a | + +## Resources + +| Name | Type | +|------|------| +| [kubernetes_namespace.service_area](https://registry.terraform.io/providers/hashicorp/kubernetes/2.32.0/docs/resources/namespace) | resource | +| [aws_eks_cluster.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.67.0/docs/data-sources/eks_cluster) | data source | +| [aws_eks_cluster_auth.cluster](https://registry.terraform.io/providers/hashicorp/aws/5.67.0/docs/data-sources/eks_cluster_auth) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [airflow\_docker\_images](#input\_airflow\_docker\_images) | Docker images for the associated Airflow services. |
object({
airflow = object({
name = string
tag = string
})
})
|
{
"airflow": {
"name": "ghcr.io/unity-sds/unity-sps/sps-airflow",
"tag": "3.0.0"
}
}
| no | +| [airflow\_webserver\_password](#input\_airflow\_webserver\_password) | The password for the Airflow webserver and UI. | `string` | n/a | yes | +| [airflow\_webserver\_username](#input\_airflow\_webserver\_username) | The username for the Airflow webserver and UI. | `string` | `"admin"` | no | +| [dag\_catalog\_repo](#input\_dag\_catalog\_repo) | Git repository that stores the catalog of Airflow DAGs. |
object({
url = string
ref = string
dags_directory_path = string
})
|
{
"dags_directory_path": "airflow/dags",
"ref": "main",
"url": "https://github.com/unity-sds/unity-sps.git"
}
| no | +| [deployment\_name](#input\_deployment\_name) | The name of the deployment. | `string` | `""` | no | +| [helm\_charts](#input\_helm\_charts) | Helm charts for the associated services. |
map(object({
repository = string
chart = string
version = string
}))
|
{
"airflow": {
"chart": "airflow",
"repository": "https://airflow.apache.org",
"version": "1.15.0"
},
"keda": {
"chart": "keda",
"repository": "https://kedacore.github.io/charts",
"version": "v2.15.1"
}
}
| no | +| [helm\_values\_template](#input\_helm\_values\_template) | The helm values template file to use. | `string` | `"values.tmpl.yaml"` | no | +| [installprefix](#input\_installprefix) | The install prefix for the service area (unused) | `string` | `""` | no | +| [karpenter\_node\_classes](#input\_karpenter\_node\_classes) | Configuration for karpenter\_node\_classes |
map(object({
volume_size = string
}))
|
{
"airflow-kubernetes-pod-operator-high-workload": {
"volume_size": "300Gi"
},
"default": {
"volume_size": "30Gi"
}
}
| no | +| [karpenter\_node\_pools](#input\_karpenter\_node\_pools) | Configuration for Karpenter node pools |
map(object({
requirements : list(object({
key : string
operator : string
values : list(string)
}))
nodeClassRef : string
limits : object({
cpu : string
memory : string
})
disruption : object({
consolidationPolicy : string
consolidateAfter : string
})
}))
|
{
"airflow-celery-workers": {
"disruption": {
"consolidateAfter": "1m",
"consolidationPolicy": "WhenEmpty"
},
"limits": {
"cpu": "80",
"memory": "320Gi"
},
"nodeClassRef": "default",
"requirements": [
{
"key": "karpenter.k8s.aws/instance-family",
"operator": "In",
"values": [
"m7i",
"m6i",
"m5",
"t3",
"c7i",
"c6i",
"c6id",
"c5",
"r7i",
"r6i",
"r5",
"m5ad"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Gt",
"values": [
"1"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Lt",
"values": [
"17"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Gt",
"values": [
"4095"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Lt",
"values": [
"65537"
]
},
{
"key": "karpenter.k8s.aws/instance-hypervisor",
"operator": "In",
"values": [
"nitro"
]
}
]
},
"airflow-core-components": {
"disruption": {
"consolidateAfter": "1m",
"consolidationPolicy": "WhenEmpty"
},
"limits": {
"cpu": "40",
"memory": "160Gi"
},
"nodeClassRef": "default",
"requirements": [
{
"key": "karpenter.k8s.aws/instance-family",
"operator": "In",
"values": [
"m7i",
"m6i",
"m5",
"t3",
"c7i",
"c6i",
"c6id",
"c5",
"r7i",
"r6i",
"r5",
"m5ad"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Gt",
"values": [
"1"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Lt",
"values": [
"17"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Gt",
"values": [
"4095"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Lt",
"values": [
"65537"
]
},
{
"key": "karpenter.k8s.aws/instance-hypervisor",
"operator": "In",
"values": [
"nitro"
]
}
]
},
"airflow-kubernetes-pod-operator": {
"disruption": {
"consolidateAfter": "1m",
"consolidationPolicy": "WhenEmpty"
},
"limits": {
"cpu": "6400",
"memory": "12800Gi"
},
"nodeClassRef": "default",
"requirements": [
{
"key": "karpenter.k8s.aws/instance-family",
"operator": "In",
"values": [
"m7i",
"m6i",
"m5",
"m5ad",
"t3",
"c7i",
"c6i",
"c6id",
"c5",
"r7i",
"r6i",
"r5"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Gt",
"values": [
"0"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Lt",
"values": [
"17"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Gt",
"values": [
"511"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Lt",
"values": [
"65537"
]
},
{
"key": "karpenter.k8s.aws/instance-hypervisor",
"operator": "In",
"values": [
"nitro"
]
}
]
},
"airflow-kubernetes-pod-operator-high-workload": {
"disruption": {
"consolidateAfter": "1m",
"consolidationPolicy": "WhenEmpty"
},
"limits": {
"cpu": "6400",
"memory": "12800Gi"
},
"nodeClassRef": "airflow-kubernetes-pod-operator-high-workload",
"requirements": [
{
"key": "karpenter.k8s.aws/instance-family",
"operator": "In",
"values": [
"m7i",
"m6i",
"m5",
"m5ad",
"t3",
"c7i",
"c6i",
"c6id",
"c5",
"r7i",
"r6i",
"r5"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Gt",
"values": [
"0"
]
},
{
"key": "karpenter.k8s.aws/instance-cpu",
"operator": "Lt",
"values": [
"65"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Gt",
"values": [
"511"
]
},
{
"key": "karpenter.k8s.aws/instance-memory",
"operator": "Lt",
"values": [
"262145"
]
},
{
"key": "karpenter.k8s.aws/instance-hypervisor",
"operator": "In",
"values": [
"nitro"
]
}
]
}
}
| no | +| [kubeconfig\_filepath](#input\_kubeconfig\_filepath) | The path to the kubeconfig file for the Kubernetes cluster. | `string` | n/a | yes | +| [mcp\_ami\_owner\_id](#input\_mcp\_ami\_owner\_id) | The owner ID of the MCP AMIs | `string` | `"794625662971"` | no | +| [ogc\_processes\_docker\_images](#input\_ogc\_processes\_docker\_images) | Docker images for the associated OGC Processes API services. |
object({
ogc_processes_api = object({
name = string
tag = string
})
git_sync = object({
name = string
tag = string
})
redis = object({
name = string
tag = string
})
})
|
{
"git_sync": {
"name": "registry.k8s.io/git-sync/git-sync",
"tag": "v4.2.4"
},
"ogc_processes_api": {
"name": "ghcr.io/unity-sds/unity-sps-ogc-processes-api/unity-sps-ogc-processes-api",
"tag": "2.1.0"
},
"redis": {
"name": "redis",
"tag": "7.4.0"
}
}
| no | +| [project](#input\_project) | The project or mission deploying Unity SPS. | `string` | `"unity"` | no | +| [release](#input\_release) | The software release version. | `string` | `"25.2"` | no | +| [service\_area](#input\_service\_area) | The service area owner of the resources being deployed. | `string` | `"sps"` | no | +| [tags](#input\_tags) | Tags for the deployment (unused) | `map(string)` |
{
"empty": ""
}
| no | +| [venue](#input\_venue) | The MCP venue in which the resources will be deployed. | `string` | n/a | yes | + +## Outputs + +| Name | Description | +|------|-------------| +| [resources](#output\_resources) | SSM parameter IDs for SPS resources. | % ``` @@ -145,7 +173,7 @@ terraform apply -no-color 2>&1 | tee apply_output.txt ## Auto-generated Documentation of the Unity SPS Terraform Root Module - + ## Requirements | Name | Version | @@ -211,4 +239,4 @@ terraform apply -no-color 2>&1 | tee apply_output.txt | Name | Description | |------|-------------| | [resources](#output\_resources) | SSM parameter IDs for SPS resources. | - + diff --git a/terraform-unity/main.tf b/terraform-unity/main.tf index 3157a511..3bbe9d0b 100644 --- a/terraform-unity/main.tf +++ b/terraform-unity/main.tf @@ -65,6 +65,7 @@ module "unity-sps-airflow" { airflow_webserver_password = var.airflow_webserver_password docker_images = var.airflow_docker_images helm_charts = var.helm_charts + helm_values_template = var.helm_values_template karpenter_node_pools = module.unity-sps-karpenter-node-config.karpenter_node_pools } diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/README.md b/terraform-unity/modules/terraform-unity-sps-airflow/README.md index 907f9d18..0a40c6c7 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/README.md +++ b/terraform-unity/modules/terraform-unity-sps-airflow/README.md @@ -1,6 +1,6 @@ # terraform-unity-sps-airflow - + ## Requirements | Name | Version | @@ -106,9 +106,10 @@ No modules. | [airflow\_webserver\_username](#input\_airflow\_webserver\_username) | The username for the Airflow webserver and UI. | `string` | n/a | yes | | [db\_instance\_identifier](#input\_db\_instance\_identifier) | The AWS DB instance identifier | `string` | n/a | yes | | [db\_secret\_arn](#input\_db\_secret\_arn) | The version of the database secret in AWS Secrets Manager | `string` | n/a | yes | -| [docker\_images](#input\_docker\_images) | Docker images for the associated services. |
object({
airflow = object({
name = string
tag = string
})
})
| n/a | yes | +| [docker\_images](#input\_docker\_images) | Docker images for the associated services. |
object({
airflow = object({
name = string
tag = string
})
})
| n/a | yes | | [efs\_file\_system\_id](#input\_efs\_file\_system\_id) | The EFS file system ID | `string` | n/a | yes | -| [helm\_charts](#input\_helm\_charts) | Helm charts for the associated services. |
map(object({
repository = string
chart = string
version = string
}))
| n/a | yes | +| [helm\_charts](#input\_helm\_charts) | Helm charts for the associated services. |
map(object({
repository = string
chart = string
version = string
}))
| n/a | yes | +| [helm\_values\_template](#input\_helm\_values\_template) | The helm values template file to use. | `string` | n/a | yes | | [karpenter\_node\_pools](#input\_karpenter\_node\_pools) | Names of the Karpenter node pools | `list(string)` | n/a | yes | | [kubeconfig\_filepath](#input\_kubeconfig\_filepath) | The path to the kubeconfig file for the Kubernetes cluster. | `string` | n/a | yes | | [kubernetes\_namespace](#input\_kubernetes\_namespace) | The kubernetes namespace for Airflow resources. | `string` | n/a | yes | @@ -125,4 +126,4 @@ No modules. | [airflow\_urls](#output\_airflow\_urls) | SSM parameter IDs and URLs for the various Airflow endpoints. | | [airflow\_venue\_urls](#output\_airflow\_venue\_urls) | URLs for the various Airflow endpoints at venue-proxy level. | | [s3\_buckets](#output\_s3\_buckets) | SSM parameter IDs and bucket names for the various buckets used in the pipeline. | - + diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf index bc9a157c..1156a1ee 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/main.tf @@ -394,7 +394,7 @@ resource "helm_release" "airflow" { version = var.helm_charts.airflow.version namespace = data.kubernetes_namespace.service_area.metadata[0].name values = [ - templatefile("${path.module}/../../../airflow/helm/values.tmpl.yaml", { + templatefile("${path.module}/../../../airflow/helm/${var.helm_values_template}", { airflow_image_repo = var.docker_images.airflow.name airflow_image_tag = var.docker_images.airflow.tag kubernetes_namespace = data.kubernetes_namespace.service_area.metadata[0].name @@ -676,7 +676,7 @@ resource "aws_ssm_parameter" "airflow_ui_health_check_endpoint" { description = "The URL of the Airflow UI." type = "String" value = jsonencode({ - "componentCategory": "processing" + "componentCategory" : "processing" "componentName" : "Airflow UI" "componentType" : "ui" "description" : "The primary GUI for the Science Processing System (SPS) to run and monitor jobs at scale." @@ -712,7 +712,7 @@ resource "aws_ssm_parameter" "airflow_api_health_check_endpoint" { description = "The URL of the Airflow REST API." type = "String" value = jsonencode({ - "componentCategory": "processing" + "componentCategory" : "processing" "componentName" : "Airflow API" "componentType" : "api" "description" : "The direct API for the job management system underlying the SPS (Airflow). Typically the OGC Processes API should be used instead, because it will abstract out a particular job engine." diff --git a/terraform-unity/modules/terraform-unity-sps-airflow/variables.tf b/terraform-unity/modules/terraform-unity-sps-airflow/variables.tf index 3f6ff754..04287efa 100644 --- a/terraform-unity/modules/terraform-unity-sps-airflow/variables.tf +++ b/terraform-unity/modules/terraform-unity-sps-airflow/variables.tf @@ -62,6 +62,11 @@ variable "helm_charts" { })) } +variable "helm_values_template" { + description = "The helm values template file to use." + type = string +} + variable "docker_images" { description = "Docker images for the associated services." type = object({ diff --git a/terraform-unity/modules/terraform-unity-sps-database/README.md b/terraform-unity/modules/terraform-unity-sps-database/README.md index 286b091b..ede88a7a 100644 --- a/terraform-unity/modules/terraform-unity-sps-database/README.md +++ b/terraform-unity/modules/terraform-unity-sps-database/README.md @@ -1,4 +1,4 @@ - + ## Requirements | Name | Version | @@ -56,4 +56,4 @@ No modules. | [db\_instance\_identifier](#output\_db\_instance\_identifier) | n/a | | [db\_latest\_snapshot](#output\_db\_latest\_snapshot) | n/a | | [db\_secret\_arn](#output\_db\_secret\_arn) | n/a | - + diff --git a/terraform-unity/modules/terraform-unity-sps-database/main.tf b/terraform-unity/modules/terraform-unity-sps-database/main.tf index f0187c27..2f6cabf2 100644 --- a/terraform-unity/modules/terraform-unity-sps-database/main.tf +++ b/terraform-unity/modules/terraform-unity-sps-database/main.tf @@ -64,11 +64,11 @@ resource "aws_security_group_rule" "eks_egress_to_rds" { resource "aws_db_instance" "sps_db" { identifier = format(local.resource_name_prefix, "db") - allocated_storage = 100 + allocated_storage = 400 storage_type = "gp3" engine = "postgres" engine_version = "16.4" - instance_class = "db.m5d.large" + instance_class = "db.m5d.2xlarge" db_name = "sps_db" username = "db_user" password = aws_secretsmanager_secret_version.db.secret_string diff --git a/terraform-unity/variables.tf b/terraform-unity/variables.tf index d79e4c65..b19414bd 100644 --- a/terraform-unity/variables.tf +++ b/terraform-unity/variables.tf @@ -62,6 +62,12 @@ variable "helm_charts" { } } +variable "helm_values_template" { + description = "The helm values template file to use." + type = string + default = "values.tmpl.yaml" +} + variable "airflow_docker_images" { description = "Docker images for the associated Airflow services." type = object({