forked from google/rootcanal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_channel.py
323 lines (253 loc) · 9.81 KB
/
test_channel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
#
# Copyright 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Script for sending testing parameters and commands to a Bluetooth device.
This script provides a simple shell interface for sending data at run-time to a
Bluetooth device. It is intended to be used in tandem with the test vendor
library project.
Usage:
Option A: Script
1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
port to use for the test channel.
Option B: Manual
1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
tcp:<port>' to forward the port to the device.
2. In a separate shell, build and push the test vendor library to the device
using the script mentioned in option A (i.e. without the --test-channel flag
set).
3. Once logcat has started, turn Bluetooth on from the device.
4. Run this program, in the shell from step 1, the port, also from step 1,
as arguments.
"""
#!/usr/bin/env python3
import cmd
import random
import socket
import string
import struct
import sys
import time
DEVICE_NAME_LENGTH = 6
DEVICE_ADDRESS_LENGTH = 6
# Used to generate fake device names and addresses during discovery.
def generate_random_name():
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
string.digits) for _ in range(DEVICE_NAME_LENGTH))
def generate_random_address():
return ''.join(random.SystemRandom().choice(string.digits) for _ in \
range(DEVICE_ADDRESS_LENGTH))
class Connection(object):
"""Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
def __init__(self, port):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(('localhost', port))
def close(self):
self._socket.close()
def send(self, data):
self._socket.sendall(data.encode())
def receive(self, size):
return self._socket.recv(size)
class TestChannel(object):
"""Checks outgoing commands and sends them once verified.
Attributes:
connection: The connection to the test vendor library that commands are sent
on.
"""
def __init__(self, port):
self._connection = Connection(port)
self._closed = False
def close(self):
self._connection.close()
self._closed = True
def send_command(self, name, args):
name_size = len(name)
args_size = len(args)
self.lint_command(name, args, name_size, args_size)
encoded_name = chr(name_size) + name
encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
command = encoded_name + encoded_args
if self._closed:
return
self._connection.send(command)
if name != 'CLOSE_TEST_CHANNEL':
print(self.receive_response().decode())
def receive_response(self):
if self._closed:
return b'Closed'
size_chars = self._connection.receive(4)
size_bytes = bytearray(size_chars)
if not size_chars:
return b'No response, assuming that the connection is broken'
response_size = 0
for i in range(0, len(size_chars) - 1):
response_size |= (size_chars[i] << (8 * i))
response = self._connection.receive(response_size)
return response
def lint_command(self, name, args, name_size, args_size):
assert name_size == len(name) and args_size == len(args)
try:
name.encode()
for arg in args:
arg.encode()
except UnicodeError:
print('Unrecognized characters.')
raise
if name_size > 255 or args_size > 255:
raise ValueError # Size must be encodable in one octet.
for arg in args:
if len(arg) > 255:
raise ValueError # Size must be encodable in one octet.
class TestChannelShell(cmd.Cmd):
"""Shell for sending test channel data to controller.
Manages the test channel to the controller and defines a set of commands the
user can send to the controller as well. These commands are processed parallel
to commands sent from the device stack and used to provide additional
debugging/testing capabilities.
Attributes:
test_channel: The communication channel to send data to the controller.
"""
def __init__(self, test_channel):
cmd.Cmd.__init__(self)
self._test_channel = test_channel
def do_add(self, args):
"""Arguments: dev_type_str Add a new device of type dev_type_str.
"""
self._test_channel.send_command('add', args.split())
def do_del(self, args):
"""Arguments: device index Delete the device with the specified index.
"""
self._test_channel.send_command('del', args.split())
def do_add_phy(self, args):
"""Arguments: dev_type_str Add a new device of type dev_type_str.
"""
self._test_channel.send_command('add_phy', args.split())
def do_del_phy(self, args):
"""Arguments: phy index Delete the phy with the specified index.
"""
self._test_channel.send_command('del_phy', args.split())
def do_add_device_to_phy(self, args):
"""Arguments: device index phy index Add a new device of type dev_type_str.
"""
self._test_channel.send_command('add_device_to_phy', args.split())
def do_del_device_from_phy(self, args):
"""Arguments: phy index Delete the phy with the specified index.
"""
self._test_channel.send_command('del_device_from_phy', args.split())
def do_add_remote(self, args):
"""Arguments: dev_type_str Connect to a remote device at arg1@arg2.
"""
self._test_channel.send_command('add_remote', args.split())
def do_get(self, args):
"""Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num.
"""
self._test_channel.send_command('get', args.split())
def do_set(self, args):
"""Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val.
"""
self._test_channel.send_command('set', args.split())
def do_set_device_address(self, args):
"""Arguments: dev_num addr Set the address of device dev_num equal to addr.
"""
self._test_channel.send_command('set_device_address', args.split())
def do_list(self, args):
"""Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr.
"""
self._test_channel.send_command('list', args.split())
def do_set_timer_period(self, args):
"""Arguments: period_ms Set the timer to fire every period_ms milliseconds
"""
self._test_channel.send_command('set_timer_period', args.split())
def do_start_timer(self, args):
"""Arguments: None. Start the timer.
"""
self._test_channel.send_command('start_timer', args.split())
def do_stop_timer(self, args):
"""Arguments: None. Stop the timer.
"""
self._test_channel.send_command('stop_timer', args.split())
def do_wait(self, args):
"""Arguments: time in seconds (float).
"""
sleep_time = float(args.split()[0])
time.sleep(sleep_time)
def do_reset(self, args):
"""Arguments: None.
Resets the simulation.
"""
self._test_channel.send_command('reset', [])
def do_end(self, args):
"""Arguments: None.
Ends the simulation and exits.
"""
self._test_channel.send_command('END_SIMULATION', [])
print('Goodbye.')
return True
def do_quit(self, args):
"""Arguments: None.
Exits the test channel.
"""
self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
self._test_channel.close()
print('Goodbye.')
return True
def do_help(self, args):
"""Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
if (len(args) == 0):
cmd.Cmd.do_help(self, args)
else:
self._test_channel.send_command('help', args.split())
def preloop(self):
"""Clear out the buffer
"""
response = self._test_channel.receive_response()
#def postcmd(self, stop, line):
#"""
#Called after each command
#stop : whether we will stop after this command
#line : the previous input line
#Return True to stop, False to continue
#"""
#if stop:
#return True
#response = self._test_channel.receive_response()
#if not response:
#return True
#print response
#return False
def main(argv):
if len(argv) != 2:
print('Usage: python test_channel.py [port]')
return
try:
port = int(argv[1])
except ValueError:
print('Error parsing port.')
else:
try:
test_channel = TestChannel(port)
except socket.error as e:
print('Error connecting to socket: %s' % e)
except:
print('Error creating test channel (check argument).')
else:
test_channel_shell = TestChannelShell(test_channel)
test_channel_shell.prompt = '$ '
test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
if __name__ == '__main__':
main(sys.argv)