forked from BretFisher/container.training
-
Notifications
You must be signed in to change notification settings - Fork 12
/
autotest.py
executable file
·453 lines (383 loc) · 15 KB
/
autotest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
#!/usr/bin/env python
# coding: utf-8
import click
import logging
import os
import random
import re
import select
import subprocess
import sys
import time
import uuid
import yaml
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
TIMEOUT = 60 # 1 minute
# This one is not a constant. It's an ugly global.
IPADDR = None
class State(object):
def __init__(self):
self.interactive = True
self.verify_status = False
self.simulate_type = True
self.switch_desktop = False
self.sync_slides = False
self.open_links = False
self.run_hidden = True
self.slide = 1
self.snippet = 0
def load(self):
data = yaml.load(open("state.yaml"))
self.interactive = bool(data["interactive"])
self.verify_status = bool(data["verify_status"])
self.simulate_type = bool(data["simulate_type"])
self.switch_desktop = bool(data["switch_desktop"])
self.sync_slides = bool(data["sync_slides"])
self.open_links = bool(data["open_links"])
self.run_hidden = bool(data["run_hidden"])
self.slide = int(data["slide"])
self.snippet = int(data["snippet"])
def save(self):
with open("state.yaml", "w") as f:
yaml.dump(dict(
interactive=self.interactive,
verify_status=self.verify_status,
simulate_type=self.simulate_type,
switch_desktop=self.switch_desktop,
sync_slides=self.sync_slides,
open_links=self.open_links,
run_hidden=self.run_hidden,
slide=self.slide,
snippet=self.snippet,
), f, default_flow_style=False)
state = State()
def hrule():
return "="*int(subprocess.check_output(["tput", "cols"]))
# A "snippet" is something that the user is supposed to do in the workshop.
# Most of the "snippets" are shell commands.
# Some of them can be key strokes or other actions.
# In the markdown source, they are the code sections (identified by triple-
# quotes) within .exercise[] sections.
class Snippet(object):
def __init__(self, slide, content):
self.slide = slide
self.content = content
# Extract the "method" (e.g. bash, keys, ...)
# On multi-line snippets, the method is alone on the first line
# On single-line snippets, the data follows the method immediately
if '\n' in content:
self.method, self.data = content.split('\n', 1)
else:
self.method, self.data = content.split(' ', 1)
self.data = self.data.strip()
self.next = None
def __str__(self):
return self.content
class Slide(object):
current_slide = 0
def __init__(self, content):
self.number = Slide.current_slide
Slide.current_slide += 1
# Remove commented-out slides
# (remark.js considers ??? to be the separator for speaker notes)
content = re.split("\n\?\?\?\n", content)[0]
self.content = content
self.snippets = []
exercises = re.findall("\.exercise\[(.*)\]", content, re.DOTALL)
for exercise in exercises:
if "```" in exercise:
previous = None
for snippet_content in exercise.split("```")[1::2]:
snippet = Snippet(self, snippet_content)
if previous:
previous.next = snippet
previous = snippet
self.snippets.append(snippet)
else:
logging.warning("Exercise on slide {} does not have any ``` snippet."
.format(self.number))
self.debug()
def __str__(self):
text = self.content
for snippet in self.snippets:
text = text.replace(snippet.content, ansi("7")(snippet.content))
return text
def debug(self):
logging.debug("\n{}\n{}\n{}".format(hrule(), self.content, hrule()))
def focus_slides():
if not state.switch_desktop:
return
subprocess.check_output(["i3-msg", "workspace", "3"])
subprocess.check_output(["i3-msg", "workspace", "1"])
def focus_terminal():
if not state.switch_desktop:
return
subprocess.check_output(["i3-msg", "workspace", "2"])
subprocess.check_output(["i3-msg", "workspace", "1"])
def focus_browser():
if not state.switch_desktop:
return
subprocess.check_output(["i3-msg", "workspace", "4"])
subprocess.check_output(["i3-msg", "workspace", "1"])
def ansi(code):
return lambda s: "\x1b[{}m{}\x1b[0m".format(code, s)
# Sleeps the indicated delay, but interruptible by pressing ENTER.
# If interrupted, returns True.
def interruptible_sleep(t):
rfds, _, _ = select.select([0], [], [], t)
return 0 in rfds
def wait_for_string(s, timeout=TIMEOUT):
logging.debug("Waiting for string: {}".format(s))
deadline = time.time() + timeout
while time.time() < deadline:
output = capture_pane()
if s in output:
return
if interruptible_sleep(1): return
raise Exception("Timed out while waiting for {}!".format(s))
def wait_for_prompt():
logging.debug("Waiting for prompt.")
deadline = time.time() + TIMEOUT
while time.time() < deadline:
output = capture_pane()
# If we are not at the bottom of the screen, there will be a bunch of extra \n's
output = output.rstrip('\n')
last_line = output.split('\n')[-1]
# Our custom prompt on the VMs has two lines; the 2nd line is just '$'
if last_line == "$":
# This is a perfect opportunity to grab the node's IP address
global IPADDR
IPADDR = re.findall("^\[(.*)\]", output, re.MULTILINE)[-1]
return
# When we are in an alpine container, the prompt will be "/ #"
if last_line == "/ #":
return
# We did not recognize a known prompt; wait a bit and check again
logging.debug("Could not find a known prompt on last line: {!r}"
.format(last_line))
if interruptible_sleep(1): return
raise Exception("Timed out while waiting for prompt!")
def check_exit_status():
if not state.verify_status:
return
token = uuid.uuid4().hex
data = "echo {} $?\n".format(token)
logging.debug("Sending {!r} to get exit status.".format(data))
send_keys(data)
time.sleep(0.5)
wait_for_prompt()
screen = capture_pane()
status = re.findall("\n{} ([0-9]+)\n".format(token), screen, re.MULTILINE)
logging.debug("Got exit status: {}.".format(status))
if len(status) == 0:
raise Exception("Couldn't retrieve status code {}. Timed out?".format(token))
if len(status) > 1:
raise Exception("More than one status code {}. I'm seeing double! Shoot them both.".format(token))
code = int(status[0])
if code != 0:
raise Exception("Non-zero exit status: {}.".format(code))
# Otherwise just return peacefully.
def setup_tmux_and_ssh():
if subprocess.call(["tmux", "has-session"]):
logging.error("Couldn't connect to tmux. Please setup tmux first.")
ipaddr = "$IPADDR"
uid = os.getuid()
raise Exception("""
1. If you're running this directly from a node:
tmux
2. If you want to control a remote tmux:
rm -f /tmp/tmux-{uid}/default && ssh -t -L /tmp/tmux-{uid}/default:/tmp/tmux-1001/default docker@{ipaddr} tmux new-session -As 0
3. If you cannot control a remote tmux:
tmux new-session ssh docker@{ipaddr}
""".format(uid=uid, ipaddr=ipaddr))
else:
logging.info("Found tmux session. Trying to acquire shell prompt.")
wait_for_prompt()
logging.info("Successfully connected to test cluster in tmux session.")
slides = [Slide("Dummy slide zero")]
content = open(sys.argv[1]).read()
# OK, this part is definitely hackish, and will break if the
# excludedClasses parameter is not on a single line.
excluded_classes = re.findall("excludedClasses: (\[.*\])", content)
excluded_classes = set(eval(excluded_classes[0]))
for slide in re.split("\n---?\n", content):
slide_classes = re.findall("class: (.*)", slide)
if slide_classes:
slide_classes = slide_classes[0].split(",")
slide_classes = [c.strip() for c in slide_classes]
if excluded_classes & set(slide_classes):
logging.info("Skipping excluded slide.")
continue
slides.append(Slide(slide))
def send_keys(data):
if state.simulate_type and data[0] != '^':
for key in data:
if key == ";":
key = "\\;"
if key == "\n":
if interruptible_sleep(1): return
subprocess.check_call(["tmux", "send-keys", key])
if interruptible_sleep(0.15*random.random()): return
if key == "\n":
if interruptible_sleep(1): return
else:
subprocess.check_call(["tmux", "send-keys", data])
def capture_pane():
return subprocess.check_output(["tmux", "capture-pane", "-p"]).decode('utf-8')
setup_tmux_and_ssh()
try:
state.load()
logging.info("Successfully loaded state from file.")
# Let's override the starting state, so that when an error occurs,
# we can restart the auto-tester and then single-step or debug.
# (Instead of running again through the same issue immediately.)
state.interactive = True
except Exception as e:
logging.exception("Could not load state from file.")
logging.warning("Using default values.")
def move_forward():
state.snippet += 1
if state.snippet > len(slides[state.slide].snippets):
state.slide += 1
state.snippet = 0
check_bounds()
def move_backward():
state.snippet -= 1
if state.snippet < 0:
state.slide -= 1
state.snippet = 0
check_bounds()
def check_bounds():
if state.slide < 1:
state.slide = 1
if state.slide >= len(slides):
state.slide = len(slides)-1
while True:
state.save()
slide = slides[state.slide]
snippet = slide.snippets[state.snippet-1] if state.snippet else None
click.clear()
print("[Slide {}/{}] [Snippet {}/{}] [simulate_type:{}] [verify_status:{}] "
"[switch_desktop:{}] [sync_slides:{}] [open_links:{}] [run_hidden:{}]"
.format(state.slide, len(slides)-1,
state.snippet, len(slide.snippets) if slide.snippets else 0,
state.simulate_type, state.verify_status,
state.switch_desktop, state.sync_slides,
state.open_links, state.run_hidden))
print(hrule())
if snippet:
print(slide.content.replace(snippet.content, ansi(7)(snippet.content)))
focus_terminal()
else:
print(slide.content)
if state.sync_slides:
subprocess.check_output(["./gotoslide.js", str(slide.number)])
focus_slides()
print(hrule())
if state.interactive:
print("y/⎵/⏎ Execute snippet or advance to next snippet")
print("p/← Previous")
print("n/→ Next")
print("s Simulate keystrokes")
print("v Validate exit status")
print("d Switch desktop")
print("k Sync slides")
print("o Open links")
print("h Run hidden commands")
print("g Go to a specific slide")
print("q Quit")
print("c Continue non-interactively until next error")
command = click.getchar()
else:
command = "y"
if command in ("n", "\x1b[C"):
move_forward()
elif command in ("p", "\x1b[D"):
move_backward()
elif command == "s":
state.simulate_type = not state.simulate_type
elif command == "v":
state.verify_status = not state.verify_status
elif command == "d":
state.switch_desktop = not state.switch_desktop
elif command == "k":
state.sync_slides = not state.sync_slides
elif command == "o":
state.open_links = not state.open_links
elif command == "h":
state.run_hidden = not state.run_hidden
elif command == "g":
state.slide = click.prompt("Enter slide number", type=int)
state.snippet = 0
check_bounds()
elif command == "q":
break
elif command == "c":
# continue until next timeout
state.interactive = False
elif command in ("y", "\r", " "):
if not snippet:
# Advance to next snippet
# Advance until a slide that has snippets
while not slides[state.slide].snippets:
move_forward()
# But stop if we reach the last slide
if state.slide == len(slides)-1:
break
# And then advance to the snippet
move_forward()
continue
method, data = snippet.method, snippet.data
logging.info("Running with method {}: {}".format(method, data))
if method == "keys":
send_keys(data)
elif method == "bash" or (method == "hide" and state.run_hidden):
# Make sure that we're ready
wait_for_prompt()
# Strip leading spaces
data = re.sub("\n +", "\n", data)
# Remove backticks (they are used to highlight sections)
data = data.replace('`', '')
# Add "RETURN" at the end of the command :)
data += "\n"
# Send command
send_keys(data)
# Force a short sleep to avoid race condition
time.sleep(0.5)
if snippet.next and snippet.next.method == "wait":
wait_for_string(snippet.next.data)
elif snippet.next and snippet.next.method == "longwait":
wait_for_string(snippet.next.data, 10*TIMEOUT)
else:
wait_for_prompt()
# Verify return code
check_exit_status()
elif method == "copypaste":
screen = capture_pane()
matches = re.findall(data, screen, flags=re.DOTALL)
if len(matches) == 0:
raise Exception("Could not find regex {} in output.".format(data))
# Arbitrarily get the most recent match
match = matches[-1]
# Remove line breaks (like a screen copy paste would do)
match = match.replace('\n', '')
send_keys(match + '\n')
# FIXME: we should factor out the "bash" method
wait_for_prompt()
check_exit_status()
elif method == "open":
# Cheap way to get node1's IP address
screen = capture_pane()
url = data.replace("/node1", "/{}".format(IPADDR))
# This should probably be adapted to run on different OS
if state.open_links:
subprocess.check_output(["xdg-open", url])
focus_browser()
if state.interactive:
print("Press any key to continue to next step...")
click.getchar()
else:
logging.warning("Unknown method {}: {!r}".format(method, data))
move_forward()
else:
logging.warning("Unknown command {}.".format(command))