Skip to content

Commit 0b2e92a

Browse files
committed
Submission utils added.
1 parent 90a00ba commit 0b2e92a

File tree

5 files changed

+346
-0
lines changed

5 files changed

+346
-0
lines changed
File renamed without changes.
File renamed without changes.

submission/fake_submit.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/usr/bin/env python3
2+
3+
# Send a fake submit to the ReCodEx broker
4+
# usage:
5+
# $ python fake_submit.py --header1 val1 --header2 val2 submit_directory
6+
7+
import zmq
8+
import os
9+
import sys
10+
import requests
11+
12+
fsrv_port = 9999
13+
broker_port = 9658
14+
15+
submit_dir = ""
16+
headers = {}
17+
argv_it = iter(sys.argv[1:])
18+
19+
for arg in argv_it:
20+
if arg.startswith("--") and len(arg) > 2:
21+
headers[arg[2:]] = next(argv_it)
22+
else:
23+
submit_dir = arg
24+
25+
if submit_dir == "":
26+
sys.exit("no directory to submit was specified")
27+
28+
# An iterator for the submitted files
29+
def walk_submit ():
30+
for root, dirs, files in os.walk(submit_dir):
31+
for name in files:
32+
yield os.path.relpath(os.path.join(root, name), submit_dir)
33+
34+
# Send the submission to our fake file server
35+
try:
36+
reply = requests.post(
37+
"http://localhost:{0}".format(fsrv_port),
38+
{
39+
f.encode(): open(os.path.join(submit_dir, f), "rb").read()
40+
for f in walk_submit()
41+
}
42+
)
43+
except:
44+
sys.exit("Error sending files to the file server")
45+
46+
job_id = reply.text
47+
48+
# Connect to the broker
49+
context = zmq.Context()
50+
broker = context.socket(zmq.REQ)
51+
52+
try:
53+
broker.connect("tcp://localhost:{}".format(broker_port))
54+
except:
55+
sys.exit("Error connecting to the broker")
56+
57+
# Send the request
58+
broker.send_multipart(
59+
[b"eval"] +
60+
[str(job_id).encode()] +
61+
["{}={}".format(k, v).encode() for k, v in headers.items()] +
62+
[b""] +
63+
["http://localhost:{0}/submit_archives/{1}.tar.gz".format(fsrv_port, job_id).encode()] +
64+
["http://localhost:{0}/results/{1}.zip".format(fsrv_port, job_id).encode()]
65+
)
66+
67+
result = broker.recv()
68+
print(result)

submission/file_server.py

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#!/usr/bin/env python3
2+
3+
# A minimal HTTP server that temporarily saves submitted files
4+
# in a temporary folder and serves them back to workers
5+
# usage:
6+
# $ python file_server.py [tasks_dir]
7+
#
8+
# If "files_dir" is specified, the server scans this directory
9+
# on startup and stores its files in the "tasks" hash storage
10+
11+
import http.server as http
12+
import socketserver
13+
import cgi
14+
import os
15+
import sys
16+
import tempfile
17+
import tarfile
18+
import hashlib
19+
import signal
20+
21+
join = os.path.join
22+
23+
port = 9999
24+
tmp = tempfile.TemporaryDirectory()
25+
26+
# Read command line arguments
27+
argv_iter = iter(sys.argv[1:])
28+
task_source = ""
29+
30+
for arg in argv_iter:
31+
task_source = arg
32+
33+
# Make the file structure of the file server
34+
archive_dir = join(tmp.name, "submit_archives")
35+
os.makedirs(archive_dir)
36+
37+
submit_dir = join(tmp.name, "submits")
38+
os.makedirs(submit_dir)
39+
40+
result_dir = join(tmp.name, "results")
41+
os.makedirs(result_dir)
42+
43+
task_dir = join(tmp.name, "tasks")
44+
os.makedirs(task_dir)
45+
46+
# An iterator for the supplementary task files
47+
def walk_files ():
48+
for root, dirs, files in os.walk(task_source):
49+
for name in files:
50+
yield os.path.join(root, name)
51+
52+
# Read and store the supplementary task files
53+
if task_source:
54+
print("Loading files from {0}...".format(os.path.realpath(task_source)))
55+
56+
for taskfile_name in walk_files():
57+
if os.path.isfile(taskfile_name):
58+
with open(taskfile_name, "rb") as taskfile:
59+
content = taskfile.read()
60+
destfile_name = hashlib.sha1(content).hexdigest()
61+
path = join(task_dir, destfile_name[0])
62+
os.makedirs(path, exist_ok = True)
63+
64+
with open(join(path, destfile_name), "wb") as destfile:
65+
destfile.write(content)
66+
67+
rel = os.path.relpath(taskfile_name, task_source)
68+
print("{0}: {1}".format(destfile_name, rel), flush = True)
69+
70+
# An id for new jobs
71+
job_id = 0
72+
73+
# Change to the temporary directory
74+
os.chdir(tmp.name)
75+
76+
class FileServerHandler(http.SimpleHTTPRequestHandler):
77+
def do_POST(self):
78+
"""
79+
Handler for POST requests
80+
Saves submitted files and makes a submit archive
81+
"""
82+
83+
form = cgi.FieldStorage(
84+
fp = self.rfile,
85+
headers = self.headers,
86+
environ = {
87+
'REQUEST_METHOD': 'POST',
88+
'CONTENT_TYPE': self.headers['Content-Type']
89+
}
90+
)
91+
92+
global job_id
93+
job_id += 1
94+
job_dir = join(submit_dir, str(job_id))
95+
os.makedirs(job_dir)
96+
97+
# Save received files
98+
for name in form.keys():
99+
dirname = os.path.dirname(name)
100+
if dirname != "":
101+
os.makedirs(join(job_dir, dirname), exist_ok = True)
102+
103+
with open(join(job_dir, name), "w") as f:
104+
f.write(form[name].value)
105+
106+
# Make an archive from the submitted files
107+
archive_path = join(archive_dir, str(job_id) + ".tar.gz")
108+
with tarfile.open(archive_path, "w:gz") as archive:
109+
archive.add(job_dir, arcname = str(job_id))
110+
111+
# Response headers
112+
self.send_response(200)
113+
self.end_headers()
114+
115+
# Return the job id assigned to submitted files
116+
self.wfile.write(str(job_id).encode())
117+
118+
def do_PUT(self):
119+
"""
120+
Handler for PUT requests
121+
Just stores a result file
122+
"""
123+
path_parts = self.path[1:].split("/")
124+
125+
if len(path_parts) != 2 or path_parts[0] != os.path.basename(result_dir):
126+
self.send_response(403)
127+
self.end_headers()
128+
return
129+
130+
length = int(self.headers["Content-Length"])
131+
with open(join(result_dir, path_parts[1]), "wb") as dest_file:
132+
dest_file.write(self.rfile.read(length))
133+
134+
self.send_response(200)
135+
self.end_headers()
136+
137+
socketserver.TCPServer.allow_reuse_address = True
138+
server = socketserver.TCPServer(("", port), FileServerHandler)
139+
140+
def exit_gracefully_handler(*args):
141+
print("Interrupted by signal", file = sys.stderr)
142+
server.server_close()
143+
sys.exit(0)
144+
145+
signal.signal(signal.SIGINT, exit_gracefully_handler)
146+
signal.signal(signal.SIGTERM, exit_gracefully_handler)
147+
148+
print("Serving files from {0} at port {1}...".format(tmp.name, port), flush = True)
149+
150+
server.serve_forever()
151+

submission/run-hippoes.sh

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#!/bin/bash
2+
3+
RECODEX_DIR=~/Documents/MFF/ReCodEx
4+
WORKER_REPO=${RECODEX_DIR}/worker
5+
BROKER_REPO=${RECODEX_DIR}/broker
6+
UTILS_REPO=${RECODEX_DIR}/utils
7+
8+
BROKER_EXEC=${BROKER_REPO}/build/recodex-broker
9+
BROKER_CONF=${BROKER_REPO}/examples/config.yml
10+
WORKER_EXEC=${WORKER_REPO}/build/recodex-worker
11+
WORKER_CONF=${WORKER_REPO}/examples/config.yml
12+
13+
FILE_SERVER=${UTILS_REPO}/submission/file_server.py
14+
FAKE_SUBMIT=${UTILS_REPO}/submission/fake_submit.py
15+
HIPPOES_DIR=${RECODEX_DIR}/hrosi-ohradka
16+
17+
RESULT_DIR=~/Desktop/job_results
18+
19+
20+
function start_broker {
21+
${BROKER_EXEC} -c ${BROKER_CONF} &
22+
BROKER_PID=$!
23+
echo "Broker started"
24+
}
25+
26+
function stop_broker {
27+
kill ${BROKER_PID}
28+
wait ${BROKER_PID} 2> /dev/null
29+
echo "Broker stopped"
30+
}
31+
32+
function start_worker {
33+
${WORKER_EXEC} -c ${WORKER_CONF} > /dev/null 2>&1 &
34+
WORKER_PID=$!
35+
echo "Worker started"
36+
}
37+
38+
function stop_worker {
39+
kill ${WORKER_PID}
40+
wait ${WORKER_PID} 2> /dev/null
41+
echo "Worker stopped"
42+
}
43+
44+
45+
function start_file_server {
46+
TEMP_OUT=/tmp/file_server_output.txt
47+
${FILE_SERVER} > ${TEMP_OUT} 2>&1 &
48+
FILE_SERVER_PID=$!
49+
sleep 1
50+
SERVER_DIR=$(head -n 1 ${TEMP_OUT} | grep -o "/tmp/tmp[^ ]*")
51+
if [ "${SERVER_DIR}" = "" ]; then
52+
echo " Cannot read file server directory!"
53+
exit 1
54+
fi
55+
cp ${HIPPOES_DIR}/*.in ${HIPPOES_DIR}/*.out ${SERVER_DIR}/tasks
56+
echo "File server started in ${SERVER_DIR}"
57+
}
58+
59+
function stop_file_server {
60+
kill -s SIGINT ${FILE_SERVER_PID}
61+
wait ${FILE_SERVER_PID} 2> /dev/null
62+
rm ${TEMP_OUT}
63+
rm -rf ${SERVER_DIR}
64+
echo "File server stopped"
65+
}
66+
67+
function prepare_submission {
68+
SUBM_DIR=/tmp/subm_hippoes
69+
mkdir -p ${SUBM_DIR}
70+
cp ${HIPPOES_DIR}/solution.c ${SUBM_DIR}
71+
cp ${WORKER_REPO}/examples/job-config-hippoes.yml ${SUBM_DIR}
72+
mv ${SUBM_DIR}/job-config-hippoes.yml ${SUBM_DIR}/job-config.yml
73+
}
74+
75+
function cleanup_submission {
76+
rm -r ${SUBM_DIR}
77+
}
78+
79+
function submit_hippoes {
80+
${FAKE_SUBMIT} ${SUBM_DIR} > /dev/null
81+
echo "Submitting job..."
82+
}
83+
84+
function wait_time {
85+
local TIME=$1
86+
echo -n "Waiting ${TIME}"
87+
while [ ${TIME} -gt 0 ]; do
88+
sleep 1
89+
local LEN=${#TIME}
90+
while [ ${LEN} -gt 0 ]; do
91+
echo -en "\b \b"
92+
LEN=$((LEN - 1))
93+
done
94+
TIME=$((TIME - 1))
95+
echo -n ${TIME}
96+
done
97+
echo ""
98+
}
99+
100+
101+
# Prepare all resources
102+
rm -rf ${RESULT_DIR}
103+
start_file_server
104+
start_broker
105+
start_worker
106+
prepare_submission
107+
108+
# Send some submits
109+
submit_hippoes
110+
submit_hippoes
111+
112+
# Wait to finish execution
113+
wait_time 10
114+
115+
# Copy interesting files to result dir
116+
cp -R ${SERVER_DIR}/results ${RESULT_DIR}
117+
118+
# Do the cleanup
119+
cleanup_submission
120+
stop_worker
121+
stop_broker
122+
stop_file_server
123+
124+
# Make logs accessible from result dir
125+
ln -s /var/log/recodex/broker.log ${RESULT_DIR}/broker.log
126+
ln -s /var/log/recodex/worker.log ${RESULT_DIR}/worker.log
127+

0 commit comments

Comments
 (0)