Skip to content

Commit

Permalink
iutctl: add log parser
Browse files Browse the repository at this point in the history
Add socat log parser
  • Loading branch information
piotrnarajowski committed Oct 29, 2024
1 parent ac5a241 commit d3326fd
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions autopts/pybtp/iutctl_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from abc import abstractmethod

from autopts.pybtp import defs
from autopts.pybtp.defs import *
from datetime import datetime
from autopts.pybtp.types import BTPError
from autopts.pybtp.parser import enc_frame, dec_hdr, dec_data, HDR_LEN
from autopts.utils import get_global_end, raise_on_global_end
Expand All @@ -49,6 +51,9 @@ class BTPSocket:
def __init__(self):
self.conn = None
self.addr = None
self.btp_service_id_dict = None
self.log_file = open(os.path.join('C:\\Users\\Piotr\\auto-pts', "autopts-testlog.log2"), "w")
self.btp_service_id_dict = self.get_svc_id()

@abstractmethod
def open(self, address):
Expand All @@ -58,6 +63,28 @@ def open(self, address):
def accept(self, timeout=10.0):
pass

@staticmethod
def get_svc_id():
"""Looks for BTP_SVC_ID variables from the defs.py"""
btp_service_ids = {}

for name, value in vars(defs).items():
if name.startswith('BTP_SERVICE_ID_') and isinstance(value, int):
trimmed_name = name.replace('BTP_SERVICE_ID_', '')
btp_service_ids[trimmed_name] = value

return btp_service_ids

def write_to_log(self, req, data, hex_data):
"""write to log function for clarity"""
current_time = datetime.now().strftime('%H:%M:%S:%f')
f = self.log_file

if req:
f.write(f'{current_time[:-3]}\t-> Command: {self.parse_data(data)} {hex_data}\n')
else:
f.write(f'{current_time[:-3]}\t<- Response: {self.parse_data(data)} {hex_data}\n')

def read(self, timeout=20.0):
"""Read BTP data from socket
Expand All @@ -77,8 +104,10 @@ def read(self, timeout=20.0):
hdr_memview = hdr_memview[nbytes:]
toread_hdr_len -= nbytes

hex_hdr = hdr.hex()
tuple_hdr = dec_hdr(hdr)
toread_data_len = tuple_hdr.data_len
self.write_to_log(0, tuple_hdr, hex_hdr)

logging.debug("Received: hdr: %r %r", tuple_hdr, hdr)

Expand Down Expand Up @@ -109,10 +138,50 @@ def send(self, svc_id, op, ctrl_index, data):
frame = enc_frame(svc_id, op, ctrl_index, data)

logging.debug("sending frame %r", frame.hex())

hex_data = frame.hex()
tuple_data = (svc_id, op, ctrl_index, len(data) if isinstance(data, (str, bytearray)) else data)
self.write_to_log(1, tuple_data, hex_data)
self.conn.send(frame)

def parse_data(self, data):
def get_btp_cmd_name(prefix, op_code):
"""Looks for BTP Command variables from the defs.py"""
if op_code == 0:
# TODO: decode error status
return BTP_ERROR
for key, value in vars(defs).items():
if (key.startswith(f'BTP_CMD_{prefix}') and value == int(op_code, 16)) or\
(key.startswith(f'BTP_EV_{prefix}') and value == int(op_code, 16)):
return key

return None # Return None if no matching variable is found

parsed_data = ''
svc_name = ''
if isinstance(data, str):
svc_id = data[:2]
opc = data[2:4]
hex_op = int(opc, 16)
opc = hex(hex_op)
else:
svc_id, opc, ctrl_idx, data_len = data[0], hex(data[1]), data[2], data[3]
for name, btp_id in self.btp_service_id_dict.items():
if btp_id == int(svc_id):
svc_name += name
break

indent = "\n\t\t\t"
btp_command = get_btp_cmd_name(svc_name, opc)
to_hex = lambda x: "0x{:02x}".format(x)
parsed_data += f'{btp_command} ({to_hex(svc_id)}|{opc}|{to_hex(ctrl_idx)}){indent} raw data ({data_len}):'

return parsed_data

@abstractmethod
def close(self):
self.log_file.close()
self.log_file = None
pass


Expand Down Expand Up @@ -148,6 +217,7 @@ def accept(self, timeout=10.0):
self.sock.settimeout(None)

def close(self):
super().close()
try:
self.conn.shutdown(socket.SHUT_RDWR)
self.conn.close()
Expand Down

0 comments on commit d3326fd

Please sign in to comment.