diff --git a/airflow/dags/appgen_dag.py b/airflow/dags/appgen_dag.py index 6f24473a..9830a8b3 100644 --- a/airflow/dags/appgen_dag.py +++ b/airflow/dags/appgen_dag.py @@ -7,116 +7,42 @@ import os from datetime import datetime +import boto3 from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.operators.python import PythonOperator, get_current_context from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.utils.trigger_rule import TriggerRule from kubernetes.client import models as k8s -from unity_sps_utils import ( # DEFAULT_LOG_LEVEL,; EC2_TYPES,; POD_LABEL,; build_ec2_type_label, +from unity_sps_utils import ( + DEFAULT_LOG_LEVEL, + EC2_TYPES, NODE_POOL_DEFAULT, NODE_POOL_HIGH_WORKLOAD, + POD_LABEL, POD_NAMESPACE, + build_ec2_type_label, get_affinity, ) from airflow import DAG -POD_LABEL = "appgen_pod" + datetime.now().strftime( - "%Y%m%d_%H%M%S_%f" -) # unique pod label to assure each job runs on its own pod - CONTAINER_RESOURCES = k8s.V1ResourceRequirements( requests={ "ephemeral-storage": "{{ params.request_storage }} ", } ) -# >>> This part will be removed once the parameters can be imported from unity_sps_plugins.py -DEFAULT_LOG_LEVEL = 20 -EC2_TYPES = { - "t3.micro": { - "desc": "General Purpose", - "cpu": 1, - "memory": 1, - }, - "t3.small": { - "desc": "General Purpose", - "cpu": 2, - "memory": 2, - }, - "t3.medium": { - "desc": "General Purpose", - "cpu": 2, - "memory": 4, - }, - "t3.large": { - "desc": "General Purpose", - "cpu": 2, - "memory": 8, - }, - "t3.xlarge": { - "desc": "General Purpose", - "cpu": 4, - "memory": 16, - }, - "t3.2xlarge": { - "desc": "General Purpose", - "cpu": 8, - "memory": 32, - }, - "r7i.xlarge": { - "desc": "Memory Optimized", - "cpu": 4, - "memory": 32, - }, - "r7i.2xlarge": { - "desc": "Memory Optimized", - "cpu": 8, - "memory": 64, - }, - "r7i.4xlarge": { - "desc": "Memory Optimized", - "cpu": 16, - "memory": 128, - }, - "r7i.8xlarge": { - "desc": "Memory Optimized", - "cpu": 32, - "memory": 256, - }, - "c6i.xlarge": { - "desc": "Compute Optimized", - "cpu": 4, - "memory": 8, - }, - "c6i.2xlarge": { - "desc": "Compute Optimized", - "cpu": 8, - "memory": 16, - }, - "c6i.4xlarge": { - "desc": "Compute Optimized", - "cpu": 16, - "memory": 32, - }, - "c6i.8xlarge": { - "desc": "Compute Optimized", - "cpu": 32, - "memory": 64, - }, -} - - -def build_ec2_type_label(key): - return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)" - +# AWS SSM parameter paths for credentials +DOCKERHUB_USERNAME = "/unity/ads/app_gen/development/dockerhub_username" +DOCKERHUB_TOKEN = "/unity/ads/app_gen/development/dockerhub_api_key" +DOCKSTORE_TOKEN = "/unity/ads/app_gen/development/dockstore_token" # <<< LOG_LEVEL_TYPE = {10: "DEBUG", 20: "INFO"} # Change this to the Docker image that contains the Application Package Generator -DOCKER_IMAGE = "docker.io/busybox" +DOCKER_IMAGE = "jplmdps/unity-app-gen:v1.1.1" # Default DAG configuration dag_default_args = { @@ -137,7 +63,12 @@ def build_ec2_type_label(key): max_active_tasks=30, default_args=dag_default_args, params={ - "message": Param("Hello World", type="string", title="Message", description="The greeting message"), + "repository": Param( + "https://github.com/unity-sds/unity-example-application", + type="string", + title="Repository", + description="Git URL of application source files", + ), "log_level": Param( DEFAULT_LOG_LEVEL, type="integer", @@ -160,11 +91,52 @@ def build_ec2_type_label(key): }, ) +app_gen_env_vars = [ + k8s.V1EnvVar( + name="DOCKERHUB_USERNAME", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_username') }}" + ), + k8s.V1EnvVar(name="DOCKERHUB_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_token') }}"), + k8s.V1EnvVar(name="DOCKSTORE_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockstore_token') }}"), + k8s.V1EnvVar( + name="DOCKSTORE_API_URL", + value="http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/api", + ), + k8s.V1EnvVar(name="GITHUB_REPO", value="{{ params.repository }}"), +] + def setup(ti=None, **context): """ Task that selects the proper Karpenter Node Pool depending on the user requested resources. """ + + ## Retrieve the docker credentials and DockStore token + ssm_client = boto3.client("ssm", region_name="us-west-2") + ssm_response = ssm_client.get_parameters( + Names=[DOCKERHUB_USERNAME, DOCKERHUB_TOKEN, DOCKSTORE_TOKEN], WithDecryption=True + ) + logging.info(ssm_response) + + # Somehow get the correct variables from SSM here + credentials_dict = {} + for param in ssm_response["Parameters"]: + if param["Name"] == DOCKERHUB_USERNAME: + credentials_dict["dockerhub_username"] = param["Value"] + elif param["Name"] == DOCKERHUB_TOKEN: + credentials_dict["dockerhub_token"] = param["Value"] + elif param["Name"] == DOCKSTORE_TOKEN: + credentials_dict["dockstore_token"] = param["Value"] + + required_credentials = ["dockerhub_username", "dockerhub_token", "dockstore_token"] + # make sure all required credentials are provided + if not set(required_credentials).issubset(list(credentials_dict.keys())): + logging.error(f"Expected all of credentials to run mdps app generator {required_credentials}") + + # use xcom to push to avoid putting credentials to the logs + ti.xcom_push(key="dockerhub_username", value=credentials_dict["dockerhub_username"]) + ti.xcom_push(key="dockerhub_token", value=credentials_dict["dockerhub_token"]) + ti.xcom_push(key="dockstore_token", value=credentials_dict["dockstore_token"]) + context = get_current_context() logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}") @@ -204,13 +176,21 @@ def setup(ti=None, **context): retries=1, task_id="appgen_task", namespace=POD_NAMESPACE, + env_vars=app_gen_env_vars, name="appgen-task-pod", image=DOCKER_IMAGE, service_account_name="airflow-worker", in_cluster=True, get_logs=True, startup_timeout_seconds=600, - arguments=["echo", "{{ti.xcom_pull(task_ids='Setup', key='message')}}"], + arguments=[ + "-r", + "{{ params.repository }}", + "-l", + "{{ params.log_level }}", + "-e", + "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", + ], container_security_context={"privileged": True}, container_resources=k8s.V1ResourceRequirements( requests={