-
Notifications
You must be signed in to change notification settings - Fork 0
/
MegaOperation.py
executable file
·383 lines (314 loc) · 13.9 KB
/
MegaOperation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python2
# python standard libraries
import __main__, sys, os, signal, pprint, configparser, argparse, logging, logging.handlers, time, threading, random, copy, pygame
# Raspberry Pi specific libraries
import MPR121
import RPi.GPIO as GPIO
#### Global Variables ####
# ws2812svr constants
channel = 2
led_count = 0 # will be calculated later, from the INI file
led_type = 1
invert = 0
global_brightness = 255
gpionum = 13
# define Big Dome pins
BIG_DOME_LED_PIN = 25
BIG_DOME_PUSHBUTTON_PIN = 24
colors = { 'off' : '000000',
'red' : 'FF0000',
'grn' : '00FF00',
'blu' : '0000FF',
'ylw' : 'FFFF00',
'brw' : '7F2805',
'prp' : 'B54A8F',
'wht' : 'FFFFFF'
}
pp = pprint.PrettyPrinter(indent=4) # Setup format for pprint.
fn = os.path.splitext(os.path.basename(__main__.__file__))[0]
args = None
config = None
pi = None
sensor = None
def setup_gpio():
global pi
#### initialize Pi GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# initialize the GPIO on the Pi Proto Board
# Big Dome LED
big_dome_led = 0
GPIO.setup(BIG_DOME_LED_PIN, GPIO.OUT)
GPIO.output(BIG_DOME_LED_PIN, big_dome_led)
# Big Dome Button
GPIO.setup(BIG_DOME_PUSHBUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def setup_mpr121():
global sensor
#### Connect to and initialize the PiCap's MPR121 hardware.
try:
sensor = MPR121.begin()
logger.info(u'MPR121 successfully initialized')
except Exception as e:
logger.critical(u'Unable to connect to initial the MPR121 on the PiCap')
logger.critical(e)
sys.exit(1)
# this is the touch threshold - setting it low makes it more like a proximity trigger default value is 40 for touch
touch_threshold = 40
# this is the release threshold - must ALWAYS be smaller than the touch threshold default value is 20 for touch
release_threshold = 20
# set the thresholds
sensor.set_touch_threshold(touch_threshold)
sensor.set_release_threshold(release_threshold)
def main():
global pi
global sensor
global led_count
button = False
prv_button = False
current_Nose_LED = False
prv_Nose_LED = False
ParseArgs()
setupLogging()
setup_gpio()
setup_mpr121()
# initialize mixer and pygame
if not args.noSound:
pygame.mixer.pre_init(frequency = 44100/4, channels = 64, buffer = 1024)
pygame.init()
sound = pygame.mixer.Sound('/home/pi/MegaOperation/sounds/.wavs/buzzer.wav')
sound.play()
# initialize CTRL-C Exit handler
signal.signal(signal.SIGINT, signal_handler)
# pre-initialize not yet used timer threads into config object for testing, later in main loop.
# and determine maximum LED position
led_count = 0
for section in config.iterkeys():
config[section]['thread'] = threading.Thread(target=sectionWorker, args=(config[section],))
maxTemp = int(config[section]['led_start']) + int(config[section]['led_length'])
if maxTemp > led_count:
led_count = maxTemp
logger.debug("Max LED position found to be " + str(led_count))
#### POST - NeoPixel Pre Operating Self Tests ####
logger.debug("initializing ws2812svr")
write_ws281x('setup {0},{1},{2},{3},{4},{5}\ninit\n'.format(channel, led_count, led_type, invert, global_brightness, gpionum))
logger.debug("POST LED test of ALL red")
write_ws281x('fill ' + str(channel) + ',' + colors['red'] + '\nrender\n')
time.sleep(args.postDelay)
logger.debug("POST LED test of ALL grn")
write_ws281x('fill ' + str(channel) + ',' + colors['grn'] + '\nrender\n')
time.sleep(args.postDelay)
logger.debug("POST LED test of ALL blu")
write_ws281x('fill ' + str(channel) + ',' + colors['blu'] + '\nrender\n')
time.sleep(args.postDelay)
logger.debug("POST LED test of ALL off")
write_ws281x('fill ' + str(channel) + ',' + colors['off'] + '\nrender\n')
#### used to locate LEDs on device
if args.walkLED:
walk_leds()
#### stop if command line requested.
if args.stop :
logger.info(u'Option set to just initialize and then quit')
quit()
#### Main Loop
while True:
# check if a sensor changed
if sensor.touch_status_changed():
sensor.update_touch_data()
# scan each of the sensors to see which one changed.
for section in config.iterkeys():
logger.log(logging.DEBUG-1, "config[" + section + "]['sensor'] = " + str(config[section]['sensor']))
# if sensor.get_touch_data(i):
# check if touch is registred to set the led status
doMagic = False
if config[section]['sensor'].isdigit(): # Only Digits are Touch Sensors.
i = int(config[section]['sensor'])
logging.info("electrode {0} was just touched".format(i))
if sensor.is_new_touch(i):
doMagic = True
elif sensor.is_new_release(i):
logging.info("electrode {0} was just released".format(i))
if doMagic:
# play sound associated with that touch
if not config[section]['thread'].is_alive():
config[section]['thread'] = threading.Thread(target=sectionWorker, args=(config[section],))
config[section]['thread'].setName(section)
config[section]['thread'].start()
is_any_sensor_thread_alive = any(config[section]['thread'].is_alive() for section in config.iterkeys() )
button = GPIO.input(BIG_DOME_PUSHBUTTON_PIN)
if (prv_button != button) :
logger.info("BIG_DOME_PUSHBUTTON_PIN changed from " + str(prv_button) + " to " + str(button))
prv_button = button
current_Nose_LED = is_any_sensor_thread_alive or not(button)
if current_Nose_LED != prv_Nose_LED:
if current_Nose_LED:
write_ws281x('fill ' + str(channel) + ',' + \
colors['wht'] + ',' + \
str(config['Nose']['led_start']) + ',' + \
str(config['Nose']['led_length']) + \
'\nrender\n')
else:
write_ws281x('fill ' + str(channel) + ',' + \
colors['off'] + ',' + \
str(config['Nose']['led_start']) + ',' + \
str(config['Nose']['led_length']) + \
'\nrender\n')
prv_Nose_LED = current_Nose_LED
time.sleep(0.01)
#end of main():
def walk_leds():
global led_count
for pos in range(led_count):
write_ws281x('fill ' + str(channel) + ',' + \
colors['red'] + ',' + \
str(pos) + ',' + \
'1' + \
'\nrender\n')
logger.debug(u'LED Index = ' + str(pos))
try:
input("Press enter to continue")
except SyntaxError:
pass
write_ws281x('fill ' + str(channel) + ',' + colors['off'] + '\nrender\n')
pos = pos + 1
exit()
def ParseArgs():
global args
global config
global fn
# Get filename of running script without path and or extension.
# Define command line arguments
parser = argparse.ArgumentParser(description='Raspberry Pi MegaOperation board game.')
parser.add_argument('--verbose', '-v', action='count', help='verbose multi level', default=1)
parser.add_argument('--config', '-c', help='specify config file', default=(os.path.join(os.path.dirname(os.path.realpath(__file__)), fn + ".ini")))
parser.add_argument('--ws281x', '-w', help='specify ws281x file handle', default="/dev/ws281x")
parser.add_argument('--stop', '-s', action='store_true', help='just initialize and stop')
parser.add_argument('--postDelay', '-p', help='specify the LED delays at startup', type=float, default="1.0")
parser.add_argument('--noSound', '-n', action='store_true', help='Run with out sound')
parser.add_argument('--singleSound', '-S', action='store_true', help='Only play one Sound at a time')
parser.add_argument('--walkLED', '-L', action='store_true', help='move LED increamentally, with standard input, used for determining LED positions.')
# Read in and parse the command line arguments
args = parser.parse_args()
os.path.join(os.path.dirname(os.path.realpath(__file__)), args.config)
# Read in configuration file and create dictionary object
configParse = configparser.ConfigParser()
configParse.read(args.config)
config = {s:dict(configParse.items(s)) for s in configParse.sections()}
# end of ParseArgs():
logger = None
def setupLogging():
global args
global config
global fn
global logger
# Setup display and file logging with level support.
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
logger = logging.getLogger()
fileHandler = logging.handlers.RotatingFileHandler("{0}/{1}.log".format('/var/log/'+ fn +'/', fn), maxBytes=2*1024*1024, backupCount=2)
fileHandler.setFormatter(logFormatter)
#fileHandler.setLevel(logging.DEBUG)
logger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
#consoleHandler.setLevel(logging.DEBUG)
consoleHandler.setFormatter(logFormatter)
logger.addHandler(consoleHandler)
# Dictionary to translate Count of -v's to logging level
verb = { 0 : logging.WARN,
1 : logging.INFO,
2 : logging.DEBUG,
}
# zero adjust for zero offset to make it easier to understand
args.verbose = int(args.verbose) - 1
try:
# set logging level from command line arg.
logger.setLevel(verb[(args.verbose)])
except:
# if out of range use levels, and account for standard levels
if (args.verbose - 2) > 0:
logger.setLevel(logging.DEBUG - (args.verbose - 2))
else:
logger.setLevel(logging.DEBUG)
logger.info(u'Starting script ' + os.path.join(os.path.dirname(os.path.realpath(__file__)), __file__))
logger.info(u'config file = ' + args.config)
logger.info(u'ws281x file handle = ' + args.ws281x)
logger.info(u'POST Delays = ' + str(args.postDelay) + " seconds")
# log which levels of debug are enabled.
logger.log(logging.DEBUG-9, "discrete log level = " + str(logging.DEBUG-9))
logger.log(logging.DEBUG-8, "discrete log level = " + str(logging.DEBUG-8))
logger.log(logging.DEBUG-7, "discrete log level = " + str(logging.DEBUG-7))
logger.log(logging.DEBUG-6, "discrete log level = " + str(logging.DEBUG-6))
logger.log(logging.DEBUG-5, "discrete log level = " + str(logging.DEBUG-5))
logger.log(logging.DEBUG-4, "discrete log level = " + str(logging.DEBUG-4))
logger.log(logging.DEBUG-3, "discrete log level = " + str(logging.DEBUG-3))
logger.log(logging.DEBUG-2, "discrete log level = " + str(logging.DEBUG-2))
logger.log(logging.DEBUG-1, "discrete log level = " + str(logging.DEBUG-1))
logger.log(logging.DEBUG, "discrete log level = " + str(logging.DEBUG ))
logger.info(u'verbose = ' + str(args.verbose) + ", logger level = " + str(logger.getEffectiveLevel()))
logger.debug(u'debug level enabled')
logger.info(u'info level enabled')
#logger.warn(u'warn level enabled')
#logger.error(u'error level enabled')
#logger.critical(u'critical level enabled')
# extra levels of DEBUG of configuration file.
logger.log(logging.DEBUG-1, "list of config sections = \r\n" + pp.pformat(config.keys()))
first_section_key = config.keys()[0]
logger.log(logging.DEBUG-2, "first section name = " + pp.pformat(first_section_key))
first_section_dict = config[first_section_key]
logger.log(logging.DEBUG-3, "list of first sections items = \r\n" + pp.pformat(first_section_dict))
first_sections_first_item = first_section_dict.keys()[0]
logger.log(logging.DEBUG-4, "config["+first_section_key+"]["+first_sections_first_item+"] = " + config[first_section_key][first_sections_first_item])
logger.log(logging.DEBUG-5, "config = " + pp.pformat(config))
# end of setupLogging():
def write_ws281x(cmd):
with open(args.ws281x, 'w') as the_file:
logger.debug("ws281x cmd: " + cmd.replace("\n", "\\n"))
the_file.write(cmd)
# file closes with unindent.
# close needed for ws2812svr to process file handle
# end of write_ws281x():
def sectionWorker(config):
section = threading.currentThread().getName()
logger.debug('Started Thread "' + section + \
'" led_on_time = ' + str(config['led_on_time']) + \
'" led_start = ' + str(config['led_start']) + \
'" led_length = ' + str(config['led_length']) + \
'" music_fnpath = ' + str(config['music_fnpath']) \
)
tmp_color = copy.deepcopy(colors)
if 'off' in tmp_color: del tmp_color['off']
if config['led_color'].lower() == 'random' :
# remove 'off', as not to get randomly
color = tmp_color[random.choice(list(tmp_color))]
else:
try:
color = tmp_color[config['led_color'].lower()]
except:
color = tmp_color[random.choice(list(tmp_color))]
write_ws281x('fill ' + str(channel) + ',' + \
color + ',' + \
str(config['led_start']) + ',' + \
str(config['led_length']) + \
'\nrender\n')
if (not args.noSound) and ('music_fnpath' in config) and (os.path.isfile(config['music_fnpath'])):
# only play if allowed and valid file is defined.
sound = pygame.mixer.Sound(config['music_fnpath'])
if (not pygame.mixer.get_busy()) or (not args.singleSound):
# play when no other is playing or play blindly if multi is allowed.
sound.play()
time.sleep(int(config['led_on_time']))
write_ws281x('fill ' + str(channel) + ',' + \
colors['off'] + ',' + \
str(config['led_start']) + ',' + \
str(config['led_length']) + \
'\nrender\n')
# end of sectionWorker():
def signal_handler(signal, frame):
# handle ctrl+c gracefully
logger.info("CTRL+C Exit LED test of ALL off")
write_ws281x('fill ' + str(channel) + ',' + colors['off'] + '\nrender\n')
logger.info(u'Exiting script ' + os.path.join(os.path.dirname(os.path.realpath(__file__)), __file__))
for section in config.iterkeys():
if config[section]['thread'].is_alive():
logger.info("waiting for thread : " + config[section]['thread'].getName() + " to end")
sys.exit(0)
# end of signal_handler():
main()