Skip to content

Commit 033e325

Browse files
authored
Merge pull request #390 from unity-sds/appgen-dag
added appgen dag
2 parents c15edad + d3662e3 commit 033e325

File tree

1 file changed

+67
-87
lines changed

1 file changed

+67
-87
lines changed

airflow/dags/appgen_dag.py

Lines changed: 67 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,116 +7,42 @@
77
import os
88
from datetime import datetime
99

10+
import boto3
1011
from airflow.models.baseoperator import chain
1112
from airflow.models.param import Param
1213
from airflow.operators.python import PythonOperator, get_current_context
1314
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
1415
from airflow.utils.trigger_rule import TriggerRule
1516
from kubernetes.client import models as k8s
16-
from unity_sps_utils import ( # DEFAULT_LOG_LEVEL,; EC2_TYPES,; POD_LABEL,; build_ec2_type_label,
17+
from unity_sps_utils import (
18+
DEFAULT_LOG_LEVEL,
19+
EC2_TYPES,
1720
NODE_POOL_DEFAULT,
1821
NODE_POOL_HIGH_WORKLOAD,
22+
POD_LABEL,
1923
POD_NAMESPACE,
24+
build_ec2_type_label,
2025
get_affinity,
2126
)
2227

2328
from airflow import DAG
2429

25-
POD_LABEL = "appgen_pod" + datetime.now().strftime(
26-
"%Y%m%d_%H%M%S_%f"
27-
) # unique pod label to assure each job runs on its own pod
28-
2930
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
3031
requests={
3132
"ephemeral-storage": "{{ params.request_storage }} ",
3233
}
3334
)
3435

35-
# >>> This part will be removed once the parameters can be imported from unity_sps_plugins.py
36-
DEFAULT_LOG_LEVEL = 20
37-
EC2_TYPES = {
38-
"t3.micro": {
39-
"desc": "General Purpose",
40-
"cpu": 1,
41-
"memory": 1,
42-
},
43-
"t3.small": {
44-
"desc": "General Purpose",
45-
"cpu": 2,
46-
"memory": 2,
47-
},
48-
"t3.medium": {
49-
"desc": "General Purpose",
50-
"cpu": 2,
51-
"memory": 4,
52-
},
53-
"t3.large": {
54-
"desc": "General Purpose",
55-
"cpu": 2,
56-
"memory": 8,
57-
},
58-
"t3.xlarge": {
59-
"desc": "General Purpose",
60-
"cpu": 4,
61-
"memory": 16,
62-
},
63-
"t3.2xlarge": {
64-
"desc": "General Purpose",
65-
"cpu": 8,
66-
"memory": 32,
67-
},
68-
"r7i.xlarge": {
69-
"desc": "Memory Optimized",
70-
"cpu": 4,
71-
"memory": 32,
72-
},
73-
"r7i.2xlarge": {
74-
"desc": "Memory Optimized",
75-
"cpu": 8,
76-
"memory": 64,
77-
},
78-
"r7i.4xlarge": {
79-
"desc": "Memory Optimized",
80-
"cpu": 16,
81-
"memory": 128,
82-
},
83-
"r7i.8xlarge": {
84-
"desc": "Memory Optimized",
85-
"cpu": 32,
86-
"memory": 256,
87-
},
88-
"c6i.xlarge": {
89-
"desc": "Compute Optimized",
90-
"cpu": 4,
91-
"memory": 8,
92-
},
93-
"c6i.2xlarge": {
94-
"desc": "Compute Optimized",
95-
"cpu": 8,
96-
"memory": 16,
97-
},
98-
"c6i.4xlarge": {
99-
"desc": "Compute Optimized",
100-
"cpu": 16,
101-
"memory": 32,
102-
},
103-
"c6i.8xlarge": {
104-
"desc": "Compute Optimized",
105-
"cpu": 32,
106-
"memory": 64,
107-
},
108-
}
109-
110-
111-
def build_ec2_type_label(key):
112-
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"
113-
36+
# AWS SSM parameter paths for credentials
37+
DOCKERHUB_USERNAME = "/unity/ads/app_gen/development/dockerhub_username"
38+
DOCKERHUB_TOKEN = "/unity/ads/app_gen/development/dockerhub_api_key"
39+
DOCKSTORE_TOKEN = "/unity/ads/app_gen/development/dockstore_token"
11440

11541
# <<<
11642
LOG_LEVEL_TYPE = {10: "DEBUG", 20: "INFO"}
11743

11844
# Change this to the Docker image that contains the Application Package Generator
119-
DOCKER_IMAGE = "docker.io/busybox"
45+
DOCKER_IMAGE = "jplmdps/unity-app-gen:v1.1.1"
12046

12147
# Default DAG configuration
12248
dag_default_args = {
@@ -137,7 +63,12 @@ def build_ec2_type_label(key):
13763
max_active_tasks=30,
13864
default_args=dag_default_args,
13965
params={
140-
"message": Param("Hello World", type="string", title="Message", description="The greeting message"),
66+
"repository": Param(
67+
"https://github.com/unity-sds/unity-example-application",
68+
type="string",
69+
title="Repository",
70+
description="Git URL of application source files",
71+
),
14172
"log_level": Param(
14273
DEFAULT_LOG_LEVEL,
14374
type="integer",
@@ -160,11 +91,52 @@ def build_ec2_type_label(key):
16091
},
16192
)
16293

94+
app_gen_env_vars = [
95+
k8s.V1EnvVar(
96+
name="DOCKERHUB_USERNAME", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_username') }}"
97+
),
98+
k8s.V1EnvVar(name="DOCKERHUB_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockerhub_token') }}"),
99+
k8s.V1EnvVar(name="DOCKSTORE_TOKEN", value="{{ ti.xcom_pull(task_ids='Setup', key='dockstore_token') }}"),
100+
k8s.V1EnvVar(
101+
name="DOCKSTORE_API_URL",
102+
value="http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/api",
103+
),
104+
k8s.V1EnvVar(name="GITHUB_REPO", value="{{ params.repository }}"),
105+
]
106+
163107

164108
def setup(ti=None, **context):
165109
"""
166110
Task that selects the proper Karpenter Node Pool depending on the user requested resources.
167111
"""
112+
113+
## Retrieve the docker credentials and DockStore token
114+
ssm_client = boto3.client("ssm", region_name="us-west-2")
115+
ssm_response = ssm_client.get_parameters(
116+
Names=[DOCKERHUB_USERNAME, DOCKERHUB_TOKEN, DOCKSTORE_TOKEN], WithDecryption=True
117+
)
118+
logging.info(ssm_response)
119+
120+
# Somehow get the correct variables from SSM here
121+
credentials_dict = {}
122+
for param in ssm_response["Parameters"]:
123+
if param["Name"] == DOCKERHUB_USERNAME:
124+
credentials_dict["dockerhub_username"] = param["Value"]
125+
elif param["Name"] == DOCKERHUB_TOKEN:
126+
credentials_dict["dockerhub_token"] = param["Value"]
127+
elif param["Name"] == DOCKSTORE_TOKEN:
128+
credentials_dict["dockstore_token"] = param["Value"]
129+
130+
required_credentials = ["dockerhub_username", "dockerhub_token", "dockstore_token"]
131+
# make sure all required credentials are provided
132+
if not set(required_credentials).issubset(list(credentials_dict.keys())):
133+
logging.error(f"Expected all of credentials to run mdps app generator {required_credentials}")
134+
135+
# use xcom to push to avoid putting credentials to the logs
136+
ti.xcom_push(key="dockerhub_username", value=credentials_dict["dockerhub_username"])
137+
ti.xcom_push(key="dockerhub_token", value=credentials_dict["dockerhub_token"])
138+
ti.xcom_push(key="dockstore_token", value=credentials_dict["dockstore_token"])
139+
168140
context = get_current_context()
169141
logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}")
170142

@@ -204,13 +176,21 @@ def setup(ti=None, **context):
204176
retries=1,
205177
task_id="appgen_task",
206178
namespace=POD_NAMESPACE,
179+
env_vars=app_gen_env_vars,
207180
name="appgen-task-pod",
208181
image=DOCKER_IMAGE,
209182
service_account_name="airflow-worker",
210183
in_cluster=True,
211184
get_logs=True,
212185
startup_timeout_seconds=600,
213-
arguments=["echo", "{{ti.xcom_pull(task_ids='Setup', key='message')}}"],
186+
arguments=[
187+
"-r",
188+
"{{ params.repository }}",
189+
"-l",
190+
"{{ params.log_level }}",
191+
"-e",
192+
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
193+
],
214194
container_security_context={"privileged": True},
215195
container_resources=k8s.V1ResourceRequirements(
216196
requests={

0 commit comments

Comments
 (0)