-
Notifications
You must be signed in to change notification settings - Fork 6
/
connect_port_get_packet.py
119 lines (94 loc) · 4.28 KB
/
connect_port_get_packet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Handle serial or TCP/IP interfaces and grab spacecraft packet packet"""
__authors__ = "James Paul Mason"
__contact__ = "[email protected]"
import serial
import socket
from find_sync_bytes import FindSyncBytes
from logger import Logger
class PacketReader:
def read_packet(self):
# From all of the binary coming in, grab and return a single packet including all headers/footers
packet = bytearray()
found_sync_start_index = 0
found_sync_stop_index = 0
found_log_packet = 0
while (found_sync_start_index + found_sync_stop_index) < 2:
buffered_data = self.get_data_from_buffer()
for byte in buffered_data:
packet.append(byte)
fsb = FindSyncBytes()
if fsb.find_log_sync_start_index(packet) != -1:
found_log_packet = True
if fsb.find_sync_start_index(packet) != -1:
found_sync_start_index = True
if fsb.find_sync_stop_index(packet) != -1:
if found_log_packet:
packet = bytearray() # Clear out the packet because its a log message not a housekeeping packet
else:
found_sync_stop_index = True
if found_sync_start_index and found_sync_stop_index:
if fsb.find_sync_start_index(packet) > fsb.find_sync_stop_index(packet):
packet = packet[fsb.find_sync_start_index(packet):]
found_sync_stop_index = 0
if len(packet) > 500: # Assuming that there's no way to have this much header on the 254 byte MinXSS packet
if found_sync_start_index:
packet = packet[fsb.find_sync_start_index(packet):] # start packet at start sync
else:
packet = bytearray()
return packet
@staticmethod
def get_data_from_buffer():
# Placeholder method to be overridden by subclasses for serial and sockets
# because they have different objects to read from and syntax to read with.
return bytearray()
class ConnectSerial(PacketReader):
def __init__(self, port, baud_rate, log):
super(ConnectSerial, self).__init__()
self.port = port
self.baud_rate = baud_rate
self.log = Logger().create_log()
self.ser = None
self.port_readable = None
self.start_sync_bytes = None
self.stop_sync_bytes = None
def connect_to_port(self):
self.log.info("Opening serial port {0} at baud rate {1}".format(self.port, self.baud_rate))
self.ser = serial.Serial(self.port, self.baud_rate)
if not self.ser.readable():
self.log.error('Serial port not readable.')
self.port_readable = False
else:
self.log.info('Successful serial port open.')
self.port_readable = True
return self
def get_data_from_buffer(self):
return bytearray(self.ser.read())
def close(self):
self.log.info("Closing serial port.")
self.ser.close()
class ConnectSocket(PacketReader):
def __init__(self, ip_address, port):
super(ConnectSocket, self).__init__()
self.ip_address = ip_address
self.port = port
self.log = Logger().create_log()
self.client_socket = None
self.port_readable = None
def connect_to_port(self):
self.log.info("Opening IP address: {0} on port: {1}".format(self.ip_address, self.port))
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client_socket.connect((self.ip_address, int(self.port)))
self.log.info('Successful TCP/IP port open.')
self.port_readable = True
except socket.error as error:
self.log.warning("Failed connecting to {0} on port {1}".format(self.ip_address, self.port))
self.log.warning(''.format(error))
self.port_readable = False
finally:
return self
def get_data_from_buffer(self):
return bytearray(self.client_socket.recv(1))
def close(self):
self.log.info("Closing TCP/IP port.")
self.client_socket.close()