forked from woodenphone/lego_dimensions_protocol
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtagtracker.py
170 lines (147 loc) · 5.24 KB
/
tagtracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#-------------------------------------------------------------------------------
# Name: module1
# Purpose:
#
# Author: User
#
# Created: 23/11/2015
# Copyright: (c) User 2015
# Licence: <your licence>
#-------------------------------------------------------------------------------
import usb
import usb.core
import usb.util
import time
import threading
import lego_dimensions_gateway
class TagTracker(lego_dimensions_gateway.Gateway):
"""
Keep track of where pads are on the pad
"""
USB_read_timeout = 10 # 10 ms seems okay for the demo
_USB_thread = None
_tag_locations = {
"removed":set(),
1:set(),
2:set(),
3:set(),
}
def __init__(self,verbose=True):
self.verbose = verbose
# Initialise USB connection to the device
self.dev = self._init_usb()
# Reset the state of the device to all pads off
#self.blank_pads()
# Begin watching USB in a new thread
self._USB_thread = threading.Thread(target=self._usb_read_thread,
args=())
self._USB_thread.start()
return
def _init_usb(self):
"""
Connect to and initialise the portal
"""
import usb.core
import usb.util
# find our device
dev = usb.core.find(idVendor=0x0e6f)# 0x0e6f Logic3 (made lego dimensions portal hardware)
# was it found?
if dev is None:
raise ValueError('Device not found')
# set the active configuration. With no arguments, the first
# configuration will be the active one
dev.set_configuration()
# Initialise portal
if self.verbose:
print "Initialising portal"
dev.write(1, [0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20, 0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31, 0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])# Startup
return dev
def _remove_tag(self, tag_uid):
# Remove tag from all
for pad_id in self._tag_locations.keys():
self._tag_locations[pad_id].discard(tag_uid)
# Add to removed box
self._tag_locations["removed"].add(tag_uid)
return
def _add_tag(self, tag_uid, destination):
# Remove tag from all
for pad_id in self._tag_locations.keys():
self._tag_locations[pad_id].discard(tag_uid)
# Add to new
self._tag_locations[destination].add(tag_uid)
return
def locate_tag(self, tag_uid):
"""Return the position of a specified tag"""
for pad_id in self._tag_locations.keys():
if tag_uid in self._tag_locations[pad_id]:
return pad_id
# If we don't know
self._remove_tag(self, tag_uid)
return None
def list_tags(self):
"""Return a list of the tags currently being tracked"""
tags = []
for pad_id in self._tag_locations.keys():
tags += list(self._tag_locations[pad_id])
return tags
def stringify_uid(self,uid_bytes):
"""Convert the bytes for the UID into a string so we can put it into a set"""
uid = ""
for uid_byte in uid_bytes:
uid += str(uid_byte)
return uid
def _update_nfc(self):
try:
inwards_packet = self.dev.read(0x81, 32, timeout = self.USB_read_timeout)
#print("inwards_packet:"+repr(inwards_packet))
bytelist = list(inwards_packet)
#print("bytelist:"+hex_repr(bytelist))
if not bytelist:# We need a packet
return
if bytelist[0] != 0x56:# Only listen to NFC packets
return
pad_num = bytelist[2]
uid_bytes = bytelist[6:12]
uid = self.stringify_uid(uid_bytes)
removed = bool(bytelist[5])# Was the tag removed, if false it was added
if removed:
self._remove_tag(tag_uid=uid)
else:
self._add_tag(tag_uid=uid, destination=pad_num)
except usb.USBError, err:
pass
def _usb_read_thread(self):
while True:
self._update_nfc()
def watch_pads():
"""Demo, tracks tag locations and prints them.
If tag is known it also lights the pad up."""
tag_colours = {
"4311985024672":(0xff,0xff,0xff)# Ghost on homebrew white 4x4 rounded plate tag
}
tracker = TagTracker(verbose=True)
while True:
time.sleep(0.2)
tag_ids = tracker.list_tags()
if tag_ids:
print("")# Seperator between text blocks
tracker.blank_pads()
for tag_id in tag_ids:
tag_location = tracker.locate_tag(tag_id)
if tag_location == "removed":
print("tag:"+repr(tag_id)+" has been removed from the gateway")
else:
print("tag:"+repr(tag_id)+" is located on pad "+repr(tag_location))
try:
tag_colour = tag_colours[tag_id]
tracker.switch_pad(
pad=tag_location,
colour=tag_colour
)
except KeyError:
pass
return
def main():
watch_pads()
if __name__ == '__main__':
main()