-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstrip_worker.py
58 lines (48 loc) · 1.62 KB
/
strip_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/python
import threading
import time
import queue
import numpy as np
from strips.led_strip import LEDStrip
from strips.gui_strip import GuiStrip
from streamer.toplevel.bass_power import BassPower
from streamer.toplevel.sound_meter import SoundMeter
from streamer.toplevel.onset_strobe import OnsetStrobe
class NewStripWorker (threading.Thread):
def __init__(self, swidth, fsample=44100, name="New LED Strip Worker"):
threading.Thread.__init__(self, name=name)
self.swidth = swidth
self.fsample = fsample
self.name = name
self.to_proc = queue.Queue()
self.to_update = queue.Queue()
self.work = True
def run(self):
nleds = 150
stream = BassPower(nleds, self.fsample, self.swidth)
#stream = SoundMeter(nleds, self.fsample, self.swidth)
stream = OnsetStrobe(nleds, self.fsample, self.swidth)
self.strip = LEDStrip(nleds)
#Add our event timer that will update lights every period
self.start_time = time.time()
self.update_idx = 0
thread = threading.Thread(target = self._updateStrip)
thread.start()
# Consume the new data and process it
while(self.work):
elem = self.to_proc.get()
stream.input(elem)
if stream.outputReady():
self.to_update.put(stream.output())
# We want to update at the same rate, without getting too behind
def _updateStrip(self):
ft = 0.75*self.swidth/float(self.fsample)
while True:
elem = self.to_update.get(block=True)
self.strip.setStrip(elem)
self.strip.update()
time.sleep(ft)
def stop(self):
self.work = False
def addWindow(self, window):
self.to_proc.put(window)