1
1
"""Modbus Request/Response Decoders."""
2
2
from __future__ import annotations
3
3
4
- import pymodbus .pdu .bit_message as bit_msg
5
- import pymodbus .pdu .diag_message as diag_msg
6
- import pymodbus .pdu .file_message as file_msg
7
- import pymodbus .pdu .mei_message as mei_msg
8
- import pymodbus .pdu .other_message as o_msg
9
- import pymodbus .pdu .pdu as base
10
- import pymodbus .pdu .register_message as reg_msg
11
4
from pymodbus .exceptions import MessageRegisterException , ModbusException
12
5
from pymodbus .logging import Log
13
-
6
+ from . pdu import ModbusPDU , ExceptionResponse
14
7
15
8
class DecodePDU :
16
9
"""Decode pdu requests/responses (server/client)."""
17
10
18
- _pdu_class_table : set [tuple [type [base .ModbusPDU ], type [base .ModbusPDU ]]] = {
19
- (reg_msg .ReadHoldingRegistersRequest , reg_msg .ReadHoldingRegistersResponse ),
20
- (bit_msg .ReadDiscreteInputsRequest , bit_msg .ReadDiscreteInputsResponse ),
21
- (reg_msg .ReadInputRegistersRequest , reg_msg .ReadInputRegistersResponse ),
22
- (bit_msg .ReadCoilsRequest , bit_msg .ReadCoilsResponse ),
23
- (bit_msg .WriteMultipleCoilsRequest , bit_msg .WriteMultipleCoilsResponse ),
24
- (reg_msg .WriteMultipleRegistersRequest , reg_msg .WriteMultipleRegistersResponse ),
25
- (reg_msg .WriteSingleRegisterRequest , reg_msg .WriteSingleRegisterResponse ),
26
- (bit_msg .WriteSingleCoilRequest , bit_msg .WriteSingleCoilResponse ),
27
- (reg_msg .ReadWriteMultipleRegistersRequest , reg_msg .ReadWriteMultipleRegistersResponse ),
28
- (diag_msg .DiagnosticBase , diag_msg .DiagnosticBase ),
29
- (o_msg .ReadExceptionStatusRequest , o_msg .ReadExceptionStatusResponse ),
30
- (o_msg .GetCommEventCounterRequest , o_msg .GetCommEventCounterResponse ),
31
- (o_msg .GetCommEventLogRequest , o_msg .GetCommEventLogResponse ),
32
- (o_msg .ReportDeviceIdRequest , o_msg .ReportDeviceIdResponse ),
33
- (file_msg .ReadFileRecordRequest , file_msg .ReadFileRecordResponse ),
34
- (file_msg .WriteFileRecordRequest , file_msg .WriteFileRecordResponse ),
35
- (reg_msg .MaskWriteRegisterRequest , reg_msg .MaskWriteRegisterResponse ),
36
- (file_msg .ReadFifoQueueRequest , file_msg .ReadFifoQueueResponse ),
37
- (mei_msg .ReadDeviceInformationRequest , mei_msg .ReadDeviceInformationResponse ),
38
- }
39
-
40
- _pdu_sub_class_table : set [tuple [type [base .ModbusPDU ], type [base .ModbusPDU ]]] = {
41
- (diag_msg .ReturnQueryDataRequest , diag_msg .ReturnQueryDataResponse ),
42
- (diag_msg .RestartCommunicationsOptionRequest , diag_msg .RestartCommunicationsOptionResponse ),
43
- (diag_msg .ReturnDiagnosticRegisterRequest , diag_msg .ReturnDiagnosticRegisterResponse ),
44
- (diag_msg .ChangeAsciiInputDelimiterRequest , diag_msg .ChangeAsciiInputDelimiterResponse ),
45
- (diag_msg .ForceListenOnlyModeRequest , diag_msg .ForceListenOnlyModeResponse ),
46
- (diag_msg .ClearCountersRequest , diag_msg .ClearCountersResponse ),
47
- (diag_msg .ReturnBusMessageCountRequest , diag_msg .ReturnBusMessageCountResponse ),
48
- (diag_msg .ReturnBusCommunicationErrorCountRequest , diag_msg .ReturnBusCommunicationErrorCountResponse ),
49
- (diag_msg .ReturnBusExceptionErrorCountRequest , diag_msg .ReturnBusExceptionErrorCountResponse ),
50
- (diag_msg .ReturnDeviceMessageCountRequest , diag_msg .ReturnDeviceMessageCountResponse ),
51
- (diag_msg .ReturnDeviceNoResponseCountRequest , diag_msg .ReturnDeviceNoResponseCountResponse ),
52
- (diag_msg .ReturnDeviceNAKCountRequest , diag_msg .ReturnDeviceNAKCountResponse ),
53
- (diag_msg .ReturnDeviceBusyCountRequest , diag_msg .ReturnDeviceBusyCountResponse ),
54
- (diag_msg .ReturnDeviceBusCharacterOverrunCountRequest , diag_msg .ReturnDeviceBusCharacterOverrunCountResponse ),
55
- (diag_msg .ReturnIopOverrunCountRequest , diag_msg .ReturnIopOverrunCountResponse ),
56
- (diag_msg .ClearOverrunCountRequest , diag_msg .ClearOverrunCountResponse ),
57
- (diag_msg .GetClearModbusPlusRequest , diag_msg .GetClearModbusPlusResponse ),
58
- (mei_msg .ReadDeviceInformationRequest , mei_msg .ReadDeviceInformationResponse ),
59
- }
11
+ _pdu_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
12
+ _pdu_sub_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
60
13
61
14
def __init__ (self , is_server : bool ) -> None :
62
15
"""Initialize function_tables."""
63
16
inx = 0 if is_server else 1
64
- self .lookup : dict [int , type [base . ModbusPDU ]] = {cl [inx ].function_code : cl [inx ] for cl in self ._pdu_class_table }
65
- self .sub_lookup : dict [int , dict [int , type [base . ModbusPDU ]]] = {}
17
+ self .lookup : dict [int , type [ModbusPDU ]] = {cl [inx ].function_code : cl [inx ] for cl in self ._pdu_class_table }
18
+ self .sub_lookup : dict [int , dict [int , type [ModbusPDU ]]] = {}
66
19
for f in self ._pdu_sub_class_table :
67
20
if (function_code := f [inx ].function_code ) not in self .sub_lookup :
68
21
self .sub_lookup [function_code ] = {f [inx ].sub_function_code : f [inx ]}
69
22
else :
70
23
self .sub_lookup [function_code ][f [inx ].sub_function_code ] = f [inx ]
71
24
72
- def lookupPduClass (self , data : bytes ) -> type [base . ModbusPDU ] | None :
25
+ def lookupPduClass (self , data : bytes ) -> type [ModbusPDU ] | None :
73
26
"""Use `function_code` to determine the class of the PDU."""
74
27
func_code = int (data [1 ])
75
28
if func_code & 0x80 :
76
- return base . ExceptionResponse
29
+ return ExceptionResponse
77
30
if func_code == 0x2B : # mei message, sub_function_code is 1 byte
78
31
sub_func_code = int (data [2 ])
79
32
return self .sub_lookup [func_code ].get (sub_func_code , None )
@@ -82,9 +35,19 @@ def lookupPduClass(self, data: bytes) -> type[base.ModbusPDU] | None:
82
35
return self .sub_lookup [func_code ].get (sub_func_code , None )
83
36
return self .lookup .get (func_code , None )
84
37
85
- def register (self , custom_class : type [base .ModbusPDU ]) -> None :
38
+ @classmethod
39
+ def add_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
40
+ """Register request/response."""
41
+ cls ._pdu_class_table .add ((req , resp ))
42
+
43
+ @classmethod
44
+ def add_sub_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
45
+ """Register request/response."""
46
+ cls ._pdu_sub_class_table .add ((req , resp ))
47
+
48
+ def register (self , custom_class : type [ModbusPDU ]) -> None :
86
49
"""Register a function and sub function class with the decoder."""
87
- if not issubclass (custom_class , base . ModbusPDU ):
50
+ if not issubclass (custom_class , ModbusPDU ):
88
51
raise MessageRegisterException (
89
52
f'"{ custom_class .__class__ .__name__ } " is Not a valid Modbus Message'
90
53
". Class needs to be derived from "
@@ -98,11 +61,11 @@ def register(self, custom_class: type[base.ModbusPDU]) -> None:
98
61
custom_class .sub_function_code
99
62
] = custom_class
100
63
101
- def decode (self , frame : bytes ) -> base . ModbusPDU | None :
64
+ def decode (self , frame : bytes ) -> ModbusPDU | None :
102
65
"""Decode a frame."""
103
66
try :
104
67
if (function_code := int (frame [0 ])) > 0x80 :
105
- pdu_exp = base . ExceptionResponse (function_code & 0x7F )
68
+ pdu_exp = ExceptionResponse (function_code & 0x7F )
106
69
pdu_exp .decode (frame [1 :])
107
70
return pdu_exp
108
71
if not (pdu_class := self .lookup .get (function_code , None )):
0 commit comments