Skip to content

Commit

Permalink
Addressed errors Michael pointed out
Browse files Browse the repository at this point in the history
  • Loading branch information
JHofman728 committed Oct 17, 2022
1 parent 921b1ad commit 4e12180
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 64 deletions.
47 changes: 31 additions & 16 deletions ait/core/server/handlers/packet_handler.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,63 @@
import pickle

from ait.core.server.handler import Handler
from ait.core import tlm
from ait.core import tlm, log


class PacketHandler(Handler):
def __init__(self, input_type=None, output_type=None, **kwargs):
"""
This handler provides a way to accept multiple packet types
(e.g. '1553_HS_Packet' and 'Ethernet_HS_Packet') single stream
and have them be processed. This handler takes a string of
raw binary data containing the packet data. It gets the UID from
the telemetry dictionary. A tuple of the UID and user data
field is returned.
Params:
input_type: (optional) Specifies expected input type, used to
validate handler workflow. Defaults to None.
output_type: (optional) Specifies expected output type, used to
validate handler workflow. Defaults to None
**kwargs:
packet: (required) Name of packet, present in default tlm dict.
packet_type: (required) Type of packet (e.g. '1553_HS_Packet', 'Ethernet_HS_Packet')
Present in default tlm dict.
Raises:
ValueError: If packet is not present in kwargs.
ValueError: If packet type is not present in kwargs.
If packet is specified but not present in default tlm dict.
"""
"""
super(PacketHandler, self).__init__(input_type, output_type)
self.packet_name = kwargs.get("packet", None)
self.packet_type = kwargs.get("packet_type", None)
self.tlm_dict = tlm.getDefaultDict()

if not self.packet_name:
msg = f'PacketHandler: No packet type provided in handler config as key "packet"'
if not self.packet_type:
msg = f'PacketHandler: No packet type provided in handler config as key "packet_type"'
raise ValueError(msg)

if self.packet_name not in self.tlm_dict:
msg = f"PacketHandler: Packet name '{self.packet_name}' not present in telemetry dictionary."
if self.packet_type not in self.tlm_dict:
msg = f"PacketHandler: Packet name '{self.packet_type}' not present in telemetry dictionary."
msg += f" Available packet types are {self.tlm_dict.keys()}"
raise ValueError(msg)

self._pkt_defn = self.tlm_dict[self.packet_name]
self._pkt_defn = self.tlm_dict[self.packet_type]

def handle(self, packet):
def handle(self, input_data):
"""
Test the input_data length against the length in the telemetry
Params:
packet: message received by stream (packet)
input_data : byteArray
message received by stream (raw data)
Returns:
tuple of packet UID and message received by stream
"""

if self._pkt_defn.nbytes != packet.nbytes:
msg = f"PacketHandler: Packet data length does not match packet definition."
raise ValueError(msg)
if self._pkt_defn.nbytes != len(input_data):
log.error(
f"PacketHandler: Packet data length does not match packet definition."
)
return 0
else:
return pickle.dumps((self._pkt_defn.uid, input_data), 2)

return pickle.dumps((self._pkt_defn.uid, packet), 2)
return pickle.dumps((self._pkt_defn.uid, input_data), 2)
127 changes: 79 additions & 48 deletions tests/ait/core/server/test_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pickle
import _pickle
import pytest
import unittest
from unittest import mock
Expand All @@ -9,7 +10,6 @@


class TestCCSDSPacketCheck(unittest.TestCase):

# Check if packet length is at least 7 bytes
def test_ccsds_packet_length(self):
handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"})
Expand Down Expand Up @@ -52,76 +52,106 @@ def test_ccsds_packet_uid(self):
self.assertEqual(packet_uid, pickle.loads(result)[0])


class TestHandlerClassWithInputOutputTypes(object):
handler = PacketHandler(packet="CCSDS_HEADER", input_type="int", output_type="str")
class TestCCSDSHandlerClassWithInputOutputTypes(object):
handler = CCSDSPacketHandler(
packet_types={"01011100111": "CCSDS_HEADER"},
input_type="int",
output_type="str",
)

def test_handler_creation(self):
assert self.handler.input_type is "int"
assert self.handler.output_type is "str"

@mock.patch(
"ait.core.server.handlers.PacketHandler.handle", return_value="SpecialReturn"
"ait.core.server.handlers.CCSDSPacketHandler.handle",
return_value="SpecialReturn",
)
def test_execute_handler_returns_handle_return_on_input(self, handle_mock):
returned = self.handler.handle("2")
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
returned = self.handler.handle(data)
assert returned == "SpecialReturn"


class TestHandlerClassWith1553HSPacket(object):
class TestCCSDSHandlerClassWithoutInputOutputTypes(object):
handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"})

def test_ccsds_handler_default_params(self):
assert self.handler.input_type is None
assert self.handler.output_type is None

@mock.patch(
"ait.core.server.handlers.CCSDSPacketHandler.handle",
return_value="SpecialReturn",
)
def test_execute_handler_returns_handle_return_on_input(self, handle_mock):
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
returned = self.handler.handle(data)
assert returned == "SpecialReturn"

def test_handler_repr(self):
assert self.handler.__repr__() == "<handler.CCSDSPacketHandler>"


class TestHandlerClassWith1553HSPacket(unittest.TestCase):
tlm_dict = tlm.getDefaultDict()
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04")
pkt_1553 = tlm_dict['1553_HS_Packet']
handler = PacketHandler(pkt_data, packet="1553_HS_Packet")
handler = PacketHandler(packet_type="1553_HS_Packet")

def test_word_array(self):
packet = tlm.Packet(self.pkt_1553, self.pkt_data)
assert packet.words.__len__() == 3.5
assert packet.words.__len__() == self.pkt_1553.nbytes/2

def test_execute_handler_returns_handle_return_on_input(self):
packet = tlm.Packet(self.pkt_1553, self.pkt_data)
result = self.handler.handle(packet)
assert 'Ethernet 1553 packet' in str(result)
def test_1553_uid(self):
packet_uid = self.tlm_dict["1553_HS_Packet"].uid
result = self.handler.handle(self.pkt_data)
self.assertEqual(packet_uid, pickle.loads(result)[0])

# Test packet length by sending a Ethernet_HS_Packet to a 1553_HS_Packet Handler
# Send only 5 bytes 1553 Packet expects 10 bytes
def test_bad_packet_length(self):
ethernet_pkt = self.tlm_dict['Ethernet_HS_Packet']
e_packet = tlm.Packet(ethernet_pkt, self.pkt_data)
with pytest.raises(ValueError):
self.handler.handle(e_packet)
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00")
with self.assertLogs("ait", level="INFO") as cm:
self.handler.handle(pkt_data)
self.assertIn(
"Packet data length does not match packet definition.",
cm.output[0],
)

def test_packet_name_error_and_no_packet_type(self):
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
with pytest.raises(ValueError):
PacketHandler(pkt_data, packet="1553_HS_Packe")
with pytest.raises(ValueError):
PacketHandler(pkt_data)
PacketHandler(packet_type="")


class TestHandlerClassWithEthernetHSPacket(object):
class TestHandlerClassWithEthernetHSPacket(unittest.TestCase):
tlm_dict = tlm.getDefaultDict()
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x07\x08\x0a")
ethernet_pkt = tlm_dict['Ethernet_HS_Packet']
handler = PacketHandler(pkt_data, packet="Ethernet_HS_Packet")
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03"
b"\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03\x02\xE7\x40\x00\x00\x00\x01")
ethernet_pkt_def = tlm_dict['Ethernet_HS_Packet']
handler = PacketHandler(packet_type="Ethernet_HS_Packet")

def test_word_array(self):
e_packet = tlm.Packet(self.ethernet_pkt, self.pkt_data)
assert e_packet.words.__len__() == 5
e_packet = tlm.Packet(self.ethernet_pkt_def, self.pkt_data)
assert e_packet.words.__len__() == self.ethernet_pkt_def.nbytes/2.0

def test_execute_handler_returns_handle_return_on_input(self):
e_packet = tlm.Packet(self.ethernet_pkt, self.pkt_data)
result = self.handler.handle(e_packet)
assert 'Ethernet Health and Status Packet' in str(result)
def test_1553_uid(self):
packet_uid = self.tlm_dict["Ethernet_HS_Packet"].uid
result = self.handler.handle(self.pkt_data)
self.assertEqual(packet_uid, pickle.loads(result)[0])

# Send a 1553 packet to an Ethernet_HS_Packet Handler
# Ethernet packet expects 37 bytes 1552 expects 10
def test_bad_packet_length(self):
pkt_1553 = self.tlm_dict['1553_HS_Packet']
packet = tlm.Packet(pkt_1553, self.pkt_data)
with pytest.raises(ValueError):
self.handler.handle(packet)
pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x07\x08\x0a")
with self.assertLogs("ait", level="INFO") as cm:
self.handler.handle(pkt_data)
self.assertIn(
"Packet data length does not match packet definition.",
cm.output[0],
)


class TestHandlerClassWithoutInputOutputTypes(object):
handler = PacketHandler(packet="CCSDS_HEADER")
handler = PacketHandler(packet_type="Ethernet_HS_Packet")

def test_handler_default_params(self):
assert self.handler.input_type is None
Expand All @@ -138,9 +168,9 @@ def test_handler_repr(self):
assert self.handler.__repr__() == "<handler.PacketHandler>"


class TestCCSDSHandlerClassWithInputOutputTypes(object):
handler = CCSDSPacketHandler(
packet_types={"01011100111": "CCSDS_HEADER"},
class TestHandlerClassWithInputOutputTypes(object):
handler = PacketHandler(
packet_type='1553_HS_Packet',
input_type="int",
output_type="str",
)
Expand All @@ -150,30 +180,31 @@ def test_handler_creation(self):
assert self.handler.output_type is "str"

@mock.patch(
"ait.core.server.handlers.CCSDSPacketHandler.handle",
"ait.core.server.handlers.PacketHandler.handle",
return_value="SpecialReturn",
)
def test_execute_handler_returns_handle_return_on_input(self, handle_mock):
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04")
returned = self.handler.handle(data)
assert returned == "SpecialReturn"


class TestCCSDSHandlerClassWithoutInputOutputTypes(object):
handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"})
class TestHandlerClassWithoutInputOutputTypes(object):
handler = PacketHandler(packet_type="Ethernet_HS_Packet")

def test_ccsds_handler_default_params(self):
assert self.handler.input_type is None
assert self.handler.output_type is None

@mock.patch(
"ait.core.server.handlers.CCSDSPacketHandler.handle",
"ait.core.server.handlers.PacketHandler.handle",
return_value="SpecialReturn",
)
def test_execute_handler_returns_handle_return_on_input(self, handle_mock):
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01")
# Note: Using 'mock' handler, the data will not be tested for length.
data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04")
returned = self.handler.handle(data)
assert returned == "SpecialReturn"

def test_handler_repr(self):
assert self.handler.__repr__() == "<handler.CCSDSPacketHandler>"
assert self.handler.__repr__() == "<handler.PacketHandler>"

0 comments on commit 4e12180

Please sign in to comment.