-
Notifications
You must be signed in to change notification settings - Fork 10
/
applications.py
125 lines (93 loc) · 3.46 KB
/
applications.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
import sys
import os
import subprocess
import logging
import command
UNKNOWN_ARGS = -5
def generic_executor(job_id, executable, args, inputs, outputs):
try :
print "Running {0} {1}".format(executable, args)
std_out = open("STDOUT.txt", 'w')
std_err = open("STDERR.txt", 'w')
print [executable, args], "stdout='STDOUT.txt', stderr='STDERR.txt'"
pid = subprocess.Popen([executable, args], stdout=std_out, stderr=std_err)
pid.wait()
std_out.close()
std_err.close()
# Invalid value provided
except ValueError as e:
logging.error("ValueError : {0}".format(e));
raise
# Failed to execute
except OSError as e:
logging.error("OSError : {0}".format(e));
raise
# Unknown error
except Exception as e:
logging.error("Generic Error : {0}".format(e));
raise
# everything is OK!
return True
def script_executor (app, job_desc):
inputs = job_desc['inputs']
walltime = int(job_desc.get("walltime", 24*60*60))
job_id = job_desc["job_id"]
script_file = job_desc.get('i_script_name')
script = job_desc.get('i_script').replace('\r\n', '\n')
cmd = job_desc["executable"]
env = {"wosuser" : job_desc.get('i_wosuser', 'None'),
"wospasswd" : job_desc.get('i_wospasswd', 'None')}
with open(script_file, 'w') as ofile:
ofile.write(script)
os.chmod(script_file, 0o744)
retcode = 9999
try:
logging.debug("script_executor, executing {0}".format(cmd))
retcode = command.execute(app, cmd, walltime, job_id, env)
except Exception as e:
logging.error("Caught exception : {0}".format(e))
raise
return retcode
def python_executor (app, job_desc):
return True
def experimental (app, job_desc):
return True
def doc_to_vec (app, job_desc):
inputs = job_desc['inputs']
cmd = "python /home/ubuntu/ncses/doc2vec/turing_updated_pipeline.py"
walltime = int(job_desc.get("walltime", 24*60*60))
job_id = job_desc["job_id"]
retcode = 9999
try:
for i in inputs:
if i["type"] == "doc":
cmd = cmd + " -d {0}".format(i["dest"])
elif i["type"] == "params":
cmd = cmd + " -p {0}".format(i["dest"])
elif i["type"] == "model":
cmd = cmd + " -m {0}".format(i["dest"])
else:
return UNKNOWN_ARGS
logging.debug("doc_to_vec, executing {0}".format(cmd))
retcode = command.execute(app, cmd, walltime, job_id)
except Exception as e:
logging.error("Caught exception : {0}".format(e))
raise
return retcode
# Job Definitions
JOBS = { "doc_to_vec" : doc_to_vec,
"generic" : generic_executor,
"script" : script_executor,
"python" : python_executor,
"experimental": experimental }
def test():
uid="fasdsadsa"
i = [{"src": "https://s3.amazonaws.com/klab-jobs/inputs/test.txt", "dest": "test.txt" }]
o = [{"src": "doc_mat.pkl", "dest": "klab-jobs/{0}/".format(uid)},
{"src": "word_mat.pkl", "dest": "klab-jobs/{0}/".format(uid)},
{"src": "mdl.pkl", "dest": "klab-jobs/{0}/".format(uid)}]
#JOBS["doc_to_vec"](i, o)
r = JOBS["generic"]("job_1001", "/home/ubuntu/task_engine/test.sh", "foobar", [], [])
print "Return code ", r
#test()