diff --git a/README.md b/README.md index f3c90c1..e0514d8 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ solark_v1.1 = SolarArk 8/12K Inverters - Untested hdhk_16ch_ac_module = some chinese current monitoring device :P srne_2021_v1.96 = SRNE inverters 2021+ (tested at ASF48100S200-H, ok-ish for HF2430U60-100 ) -eg4_v58 = eg4 inverters ( EG4-6000XP, EG4-18K ) - confirmed working +eg4_v58 = eg4 inverters ( EG4-6000XP, EG4-12K, EG4-18K ) - confirmed working eg4_3000ehv_v1 = eg4 inverters ( EG4_3000EHV ) ``` diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index 423a31f..7099282 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -7,6 +7,7 @@ import os import re import time +import copy from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING, Union @@ -224,6 +225,9 @@ class registry_map_entry: write_mode : WriteMode = WriteMode.READ ''' enable disable reading/writing ''' + has_enum_mapping : bool = False + ''' indicates if this field has enum mappings that should be treated as strings ''' + def __str__(self): return self.variable_name @@ -685,7 +689,8 @@ def process_row(row): value_regex=value_regex, read_command = read_command, read_interval=read_interval, - write_mode=writeMode + write_mode=writeMode, + has_enum_mapping=value_is_json ) registry_map.append(item) @@ -794,6 +799,11 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register except ValueError: pass + # Debug: log the parameters + import logging + log = logging.getLogger(__name__) + log.debug(f"calculate_registry_ranges: max_register={max_register}, max_batch_size={max_batch_size}, map_size={len(map)}, init={init}") + start = -max_batch_size ranges : list[tuple] = [] @@ -871,6 +881,7 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = "", sett size = item.register self.registry_map_size[registry_type] = size + self._log.debug(f"load_registry_map: {registry_type.name} - loaded {len(self.registry_map[registry_type])} entries, max_register={size}") self.registry_map_ranges[registry_type] = self.calculate_registry_ranges(self.registry_map[registry_type], self.registry_map_size[registry_type], init=True) def process_register_bytes(self, registry : dict[int,bytes], entry : registry_map_entry): @@ -1115,7 +1126,8 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=byte_order) #convert to ushort to bytes value = value.hex() #convert bytes to hex elif entry.data_type == Data_Type.ASCII: - value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=byte_order) #convert to ushort to bytes + # For ASCII data, use little-endian byte order to read characters in correct order + value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder="little") #convert to ushort to bytes try: value = value.decode("utf-8") #convert bytes to ascii except UnicodeDecodeError as e: @@ -1300,4 +1312,47 @@ def replace_vars(match): for r in results: print(evaluate_expression(r)) + def reset_register_timestamps(self): + """Reset the next_read_timestamp values for all registry entries""" + for registry_type in Registry_Type: + if registry_type in self.registry_map: + for entry in self.registry_map[registry_type]: + entry.next_read_timestamp = 0 + self._log.debug(f"Reset timestamps for all registry entries in protocol {self.protocol}") + + def __deepcopy__(self, memo): + """Custom deep copy implementation to handle non-copyable attributes""" + # Create a new instance + new_instance = protocol_settings.__new__(protocol_settings) + + # Copy basic attributes + new_instance.protocol = self.protocol + new_instance.settings_dir = self.settings_dir + new_instance.transport_settings = self.transport_settings + new_instance.byteorder = self.byteorder + + # Copy dictionaries and lists + new_instance.variable_mask = copy.deepcopy(self.variable_mask, memo) + new_instance.variable_screen = copy.deepcopy(self.variable_screen, memo) + new_instance.codes = copy.deepcopy(self.codes, memo) + new_instance.settings = copy.deepcopy(self.settings, memo) + + # Copy registry maps with deep copy of entries + new_instance.registry_map = {} + for registry_type, entries in self.registry_map.items(): + new_instance.registry_map[registry_type] = copy.deepcopy(entries, memo) + + new_instance.registry_map_size = copy.deepcopy(self.registry_map_size, memo) + new_instance.registry_map_ranges = copy.deepcopy(self.registry_map_ranges, memo) + + # Copy transport + new_instance.transport = self.transport + + # Recreate logger (not copyable) + new_instance._log_level = self._log_level + new_instance._log = logging.getLogger(__name__) + new_instance._log.setLevel(new_instance._log_level) + + return new_instance + #settings = protocol_settings('v0.14') diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py index 0e61032..7efdf56 100644 --- a/classes/transports/influxdb_out.py +++ b/classes/transports/influxdb_out.py @@ -6,10 +6,11 @@ from typing import TextIO import time import logging +import threading from defs.common import strtobool -from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry +from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry, Data_Type from .transport_base import transport_base @@ -46,7 +47,6 @@ class influxdb_out(transport_base): periodic_reconnect_interval: float = 14400.0 # 4 hours in seconds client = None - batch_points = [] last_batch_time = 0 last_connection_check = 0 connection_check_interval = 300 # Check connection every 300 seconds @@ -92,6 +92,10 @@ def __init__(self, settings: SectionProxy): self.write_enabled = True # InfluxDB output is always write-enabled super().__init__(settings) + # Initialize instance variables for thread safety + self.batch_points = [] + self._batch_lock = threading.Lock() + # Initialize persistent storage if self.enable_persistent_storage: self._init_persistent_storage() @@ -172,7 +176,7 @@ def _add_to_backlog(self, point): self._log.warning(f"Backlog full, removed oldest point: {removed.get('measurement', 'unknown')}") self._save_backlog() - self._log.debug(f"Added point to backlog. Backlog size: {len(self.backlog_points)}") + # self._log.debug(f"Added point to backlog. Backlog size: {len(self.backlog_points)}") # Suppressed debug message def _flush_backlog(self): """Flush backlog points to InfluxDB""" @@ -258,7 +262,7 @@ def _check_connection(self): try: # Test current connection self.client.ping() - self._log.debug("Periodic connection check: connection is healthy") + # self._log.debug("Periodic connection check: connection is healthy") # Suppressed debug message except Exception as e: self._log.warning(f"Periodic connection check failed: {e}") return self._attempt_reconnect() @@ -345,10 +349,16 @@ def trigger_periodic_reconnect(self): return self._check_connection() def write_data(self, data: dict[str, str], from_transport: transport_base): - """Write data to InfluxDB""" + # Promote LCDMachineModelCode to device_model if present and meaningful + if "LCDMachineModelCode" in data and data["LCDMachineModelCode"] and data["LCDMachineModelCode"] != "hotnoob": + from_transport.device_model = data["LCDMachineModelCode"] if not self.write_enabled: return + # Debug logging to track transport data flow + if self._log.isEnabledFor(logging.DEBUG): + self._log.debug(f"Received data from {from_transport.transport_name} (serial: {from_transport.device_serial_number}) with {len(data)} fields") + # Check connection status before processing data if not self._check_connection(): self._log.warning("Not connected to InfluxDB, storing data in backlog") @@ -356,14 +366,16 @@ def write_data(self, data: dict[str, str], from_transport: transport_base): self._process_and_store_data(data, from_transport) return - self._log.debug(f"write data from [{from_transport.transport_name}] to influxdb_out transport") - self._log.debug(f"Data: {data}") + # self._log.debug(f"write data from [{from_transport.transport_name}] to influxdb_out transport") # Suppressed debug message + # self._log.debug(f"Data: {data}") # Suppressed debug message # Process and write data self._process_and_write_data(data, from_transport) def _process_and_store_data(self, data: dict[str, str], from_transport: transport_base): - """Process data and store in backlog when not connected""" + # Promote LCDMachineModelCode to device_model if present and meaningful + if "LCDMachineModelCode" in data and data["LCDMachineModelCode"] and data["LCDMachineModelCode"] != "hotnoob": + from_transport.device_model = data["LCDMachineModelCode"] if not self.enable_persistent_storage: self._log.warning("Persistent storage disabled, data will be lost") return @@ -375,16 +387,24 @@ def _process_and_store_data(self, data: dict[str, str], from_transport: transpor self._add_to_backlog(point) # Also add to current batch for immediate flush when reconnected - self.batch_points.append(point) + should_flush = False + with self._batch_lock: + self.batch_points.append(point) + + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + # self._log.debug(f"Flushing batch to backlog: size={len(self.batch_points)}") # Suppressed debug message + should_flush = True - current_time = time.time() - if (len(self.batch_points) >= self.batch_size or - (current_time - self.last_batch_time) >= self.batch_timeout): - self._log.debug(f"Flushing batch to backlog: size={len(self.batch_points)}") + # Flush outside the lock to avoid deadlock + if should_flush: self._flush_batch() def _process_and_write_data(self, data: dict[str, str], from_transport: transport_base): - """Process data and write to InfluxDB when connected""" + # Promote LCDMachineModelCode to device_model if present and meaningful + if "LCDMachineModelCode" in data and data["LCDMachineModelCode"] and data["LCDMachineModelCode"] != "hotnoob": + from_transport.device_model = data["LCDMachineModelCode"] # Prepare tags for InfluxDB tags = {} @@ -403,67 +423,61 @@ def _process_and_write_data(self, data: dict[str, str], from_transport: transpor # Prepare fields (the actual data values) fields = {} for key, value in data.items(): - # Check if we should force float formatting based on protocol settings should_force_float = False unit_mod_found = None - - # Try to get registry entry from protocol settings to check unit_mod + is_enum = False + is_ascii = False + entry = None if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: - # Check both input and holding registries for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: registry_map = from_transport.protocolSettings.get_registry_map(registry_type) - for entry in registry_map: - # Match by variable_name (which is lowercase) - if entry.variable_name.lower() == key.lower(): - unit_mod_found = entry.unit_mod - # If unit_mod is not 1.0, this value should be treated as float - if entry.unit_mod != 1.0: + for e in registry_map: + if e.variable_name.lower() == key.lower(): + entry = e + unit_mod_found = e.unit_mod + if e.unit_mod != 1.0: should_force_float = True - self._log.debug(f"Variable {key} has unit_mod {entry.unit_mod}, forcing float format") + if getattr(e, 'has_enum_mapping', False): + is_enum = True + if getattr(e, 'data_type', None) == Data_Type.ASCII: + is_ascii = True break - if should_force_float: + if should_force_float or is_enum or is_ascii: break - - # Try to convert to numeric values for InfluxDB + if is_enum or is_ascii: + fields[key] = str(value) + continue try: - # Try to convert to float first float_val = float(value) - - # Always use float for InfluxDB to avoid type conflicts - # InfluxDB is strict about field types - once a field is created as integer, - # it must always be integer. Using float avoids this issue. if self.force_float: fields[key] = float_val else: - # Only use integer if it's actually an integer and we're not forcing floats if float_val.is_integer(): fields[key] = int(float_val) else: fields[key] = float_val - - # Log data type conversion for debugging - if self._log.isEnabledFor(logging.DEBUG): - original_type = type(value).__name__ - final_type = type(fields[key]).__name__ - self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]") - except (ValueError, TypeError): - # If conversion fails, store as string fields[key] = str(value) self._log.debug(f"Field {key}: {value} -> string (conversion failed)") # Create InfluxDB point point = self._create_influxdb_point(data, from_transport) - # Add to batch - self.batch_points.append(point) - self._log.debug(f"Added point to batch. Batch size: {len(self.batch_points)}") + # Add to batch with thread safety + should_flush = False + with self._batch_lock: + self.batch_points.append(point) + # self._log.debug(f"Added point to batch. Batch size: {len(self.batch_points)}") # Suppressed debug message + + # Check if we should flush the batch + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + # self._log.debug(f"Flushing batch: size={len(self.batch_points)}, timeout={current_time - self.last_batch_time:.1f}s") # Suppressed debug message + should_flush = True - # Check if we should flush the batch - current_time = time.time() - if (len(self.batch_points) >= self.batch_size or - (current_time - self.last_batch_time) >= self.batch_timeout): - self._log.debug(f"Flushing batch: size={len(self.batch_points)}, timeout={current_time - self.last_batch_time:.1f}s") + # Flush outside the lock to avoid deadlock + if should_flush: self._flush_batch() def _create_influxdb_point(self, data: dict[str, str], from_transport: transport_base): @@ -485,44 +499,42 @@ def _create_influxdb_point(self, data: dict[str, str], from_transport: transport # Prepare fields (the actual data values) fields = {} for key, value in data.items(): - # Check if we should force float formatting based on protocol settings should_force_float = False unit_mod_found = None - - # Try to get registry entry from protocol settings to check unit_mod + is_enum = False + is_ascii = False + entry = None if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: - # Check both input and holding registries for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: registry_map = from_transport.protocolSettings.get_registry_map(registry_type) - for entry in registry_map: - # Match by variable_name (which is lowercase) - if entry.variable_name.lower() == key.lower(): - unit_mod_found = entry.unit_mod - # If unit_mod is not 1.0, this value should be treated as float - if entry.unit_mod != 1.0: + for e in registry_map: + if e.variable_name.lower() == key.lower(): + entry = e + unit_mod_found = e.unit_mod + if e.unit_mod != 1.0: should_force_float = True + if getattr(e, 'has_enum_mapping', False): + is_enum = True + if getattr(e, 'data_type', None) == Data_Type.ASCII: + is_ascii = True break - if should_force_float: + if should_force_float or is_enum or is_ascii: break - - # Try to convert to numeric values for InfluxDB + if is_enum or is_ascii: + fields[key] = str(value) + continue try: - # Try to convert to float first float_val = float(value) - - # Always use float for InfluxDB to avoid type conflicts if self.force_float: fields[key] = float_val else: - # Only use integer if it's actually an integer and we're not forcing floats if float_val.is_integer(): fields[key] = int(float_val) else: fields[key] = float_val - except (ValueError, TypeError): - # If conversion fails, store as string fields[key] = str(value) + self._log.debug(f"Field {key}: {value} -> string (conversion failed)") # Create InfluxDB point point = { @@ -539,22 +551,78 @@ def _create_influxdb_point(self, data: dict[str, str], from_transport: transport def _flush_batch(self): """Flush the batch of points to InfluxDB""" - if not self.batch_points: - return + with self._batch_lock: + if not self.batch_points: + return + + # Get a copy of the batch points and clear the list + points_to_write = self.batch_points.copy() + self.batch_points = [] # Check connection before attempting to write if not self._check_connection(): self._log.warning("Not connected to InfluxDB, storing batch in backlog") # Store all points in backlog - for point in self.batch_points: + for point in points_to_write: self._add_to_backlog(point) - self.batch_points = [] return try: - self.client.write_points(self.batch_points) - self._log.info(f"Wrote {len(self.batch_points)} points to InfluxDB") - self.batch_points = [] + self.client.write_points(points_to_write) + + # Log serial numbers and sample values if debug level is enabled + if self._log.isEnabledFor(logging.DEBUG): + serial_numbers = [] + sample_values = [] + for point in points_to_write: + # Get serial number + if 'tags' in point and 'device_serial_number' in point['tags']: + serial_numbers.append(point['tags']['device_serial_number']) + else: + serial_numbers.append('None') + + # Get sample field values + if 'fields' in point: + fields = point['fields'] + sample_data = {} + for field_name in ['vacr', 'VacR', 'soc', 'SOC', 'fwcode', 'FWCode', 'vbat', 'Vbat', 'pinv', 'Pinv']: + if field_name in fields: + sample_data[field_name] = fields[field_name] + if sample_data: + sample_values.append(sample_data) + else: + # Debug: Log all available fields when sample fields are not found + if len(fields) > 0: + # Show first 10 field names to help identify the correct names + field_names = list(fields.keys())[:10] + sample_values.append(f'No sample fields found. Available fields: {field_names}') + else: + sample_values.append('No fields found') + else: + sample_values.append('No fields found') + + serial_list = ', '.join(serial_numbers) + self._log.info(f"Wrote {len(points_to_write)} points to InfluxDB (serial numbers: {serial_list})") + + # Log sample values for each point + for i, (serial, samples) in enumerate(zip(serial_numbers, sample_values)): + # Get transport name from point tags + transport_name = 'unknown' + if i < len(points_to_write) and 'tags' in points_to_write[i] and 'transport' in points_to_write[i]['tags']: + transport_name = points_to_write[i]['tags']['transport'] + + # Debug: Log the full tags for troubleshooting + if i < len(points_to_write) and 'tags' in points_to_write[i]: + self._log.debug(f" Point {i+1} tags: {points_to_write[i]['tags']}") + + if isinstance(samples, dict): + sample_str = ', '.join([f"{k}={v}" for k, v in samples.items()]) + self._log.debug(f" Point {i+1} ({serial}) from {transport_name}: {sample_str}") + else: + self._log.debug(f" Point {i+1} ({serial}) from {transport_name}: {samples}") + else: + self._log.info(f"Wrote {len(points_to_write)} points to InfluxDB") + self.last_batch_time = time.time() except Exception as e: self._log.error(f"Failed to write batch to InfluxDB: {e}") @@ -562,22 +630,68 @@ def _flush_batch(self): if self._attempt_reconnect(): # If reconnection successful, try to write again try: - self.client.write_points(self.batch_points) - self._log.info(f"Successfully wrote {len(self.batch_points)} points to InfluxDB after reconnection") - self.batch_points = [] + self.client.write_points(points_to_write) + + # Log serial numbers and sample values if debug level is enabled + if self._log.isEnabledFor(logging.DEBUG): + serial_numbers = [] + sample_values = [] + for point in points_to_write: + # Get serial number + if 'tags' in point and 'device_serial_number' in point['tags']: + serial_numbers.append(point['tags']['device_serial_number']) + else: + serial_numbers.append('None') + + # Get sample field values + if 'fields' in point: + fields = point['fields'] + sample_data = {} + for field_name in ['vacr', 'soc', 'fwcode', 'vbat', 'pinv']: + if field_name in fields: + sample_data[field_name] = fields[field_name] + if sample_data: + sample_values.append(sample_data) + else: + # Debug: Log all available fields when sample fields are not found + if len(fields) > 0: + # Show first 10 field names to help identify the correct names + field_names = list(fields.keys())[:10] + sample_values.append(f'No sample fields found. Available fields: {field_names}') + else: + sample_values.append('No fields found') + else: + sample_values.append('No fields found') + + serial_list = ', '.join(serial_numbers) + self._log.info(f"Successfully wrote {len(points_to_write)} points to InfluxDB after reconnection (serial numbers: {serial_list})") + + # Log sample values for each point + for i, (serial, samples) in enumerate(zip(serial_numbers, sample_values)): + # Get transport name from point tags + transport_name = 'unknown' + if i < len(points_to_write) and 'tags' in points_to_write[i] and 'transport' in points_to_write[i]['tags']: + transport_name = points_to_write[i]['tags']['transport'] + + if isinstance(samples, dict): + sample_str = ', '.join([f"{k}={v}" for k, v in samples.items()]) + self._log.debug(f" Point {i+1} ({serial}) from {transport_name}: {sample_str}") + else: + self._log.debug(f" Point {i+1} ({serial}) from {transport_name}: {samples}") + else: + self._log.info(f"Successfully wrote {len(points_to_write)} points to InfluxDB after reconnection") + self.last_batch_time = time.time() except Exception as retry_e: self._log.error(f"Failed to write batch after reconnection: {retry_e}") # Store failed points in backlog - for point in self.batch_points: + for point in points_to_write: self._add_to_backlog(point) - self.batch_points = [] self.connected = False else: # Store failed points in backlog - for point in self.batch_points: + for point in points_to_write: self._add_to_backlog(point) - self.batch_points = [] self.connected = False def init_bridge(self, from_transport: transport_base): diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index 5ac69dc..d220cb9 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -3,12 +3,87 @@ import os import re import time +import threading from typing import TYPE_CHECKING +from dataclasses import dataclass +from datetime import datetime, timedelta from pymodbus.exceptions import ModbusIOException +from pymodbus.pdu import ExceptionResponse # Import for exception code constants from defs.common import strtobool +# Modbus function codes for exception interpretation +MODBUS_FUNCTION_CODES = { + 0x01: "Read Coils", + 0x02: "Read Discrete Inputs", + 0x03: "Read Holding Registers", + 0x04: "Read Input Registers", + 0x05: "Write Single Coil", + 0x06: "Write Single Register", + 0x0F: "Write Multiple Coils", + 0x10: "Write Multiple Registers", + 0x14: "Read File Record", + 0x15: "Write File Record", + 0x16: "Mask Write Register", + 0x17: "Read/Write Multiple Registers", + 0x2B: "Read Device Identification" +} + +# Modbus exception codes for exception interpretation (from pymodbus.pdu.ExceptionResponse) +MODBUS_EXCEPTION_CODES = { + ExceptionResponse.ILLEGAL_FUNCTION: "ILLEGAL_FUNCTION", + ExceptionResponse.ILLEGAL_ADDRESS: "ILLEGAL_ADDRESS", + ExceptionResponse.ILLEGAL_VALUE: "ILLEGAL_VALUE", + ExceptionResponse.SLAVE_FAILURE: "SLAVE_FAILURE", + ExceptionResponse.ACKNOWLEDGE: "ACKNOWLEDGE", + ExceptionResponse.SLAVE_BUSY: "SLAVE_BUSY", + ExceptionResponse.NEGATIVE_ACKNOWLEDGE: "NEGATIVE_ACKNOWLEDGE", + ExceptionResponse.MEMORY_PARITY_ERROR: "MEMORY_PARITY_ERROR", + ExceptionResponse.GATEWAY_PATH_UNAVIABLE: "GATEWAY_PATH_UNAVAILABLE", + ExceptionResponse.GATEWAY_NO_RESPONSE: "GATEWAY_NO_RESPONSE" +} + +# Descriptions for Modbus exception codes (using ExceptionResponse constants as keys) +MODBUS_EXCEPTION_DESCRIPTIONS = { + ExceptionResponse.ILLEGAL_FUNCTION: "The function code received in the query is not an allowable action for the slave", + ExceptionResponse.ILLEGAL_ADDRESS: "The data address received in the query is not an allowable address for the slave", + ExceptionResponse.ILLEGAL_VALUE: "A value contained in the query data field is not an allowable value for the slave", + ExceptionResponse.SLAVE_FAILURE: "An unrecoverable error occurred while the slave was attempting to perform the requested action", + ExceptionResponse.ACKNOWLEDGE: "The slave has accepted the request and is processing it, but a long duration of time will be required", + ExceptionResponse.SLAVE_BUSY: "The slave is engaged in processing a long-duration program command", + ExceptionResponse.NEGATIVE_ACKNOWLEDGE: "The slave cannot perform the program function received in the query", + ExceptionResponse.MEMORY_PARITY_ERROR: "The slave attempted to read record file, but detected a parity error in the memory", + ExceptionResponse.GATEWAY_PATH_UNAVIABLE: "The gateway path is not available", + ExceptionResponse.GATEWAY_NO_RESPONSE: "The gateway target device failed to respond" +} + +def interpret_modbus_exception_code(code): + """ + Interpret a Modbus exception response code and return human-readable information. + + Args: + code (int): The exception response code (e.g., 132) + + Returns: + str: Human-readable description of the exception + """ + # Extract function code (lower 7 bits) + function_code = code & 0x7F + + # Check if this is an exception response (upper bit set) + if code & 0x80: + # This is an exception response + exception_code = code & 0x7F # The exception code is in the lower 7 bits + function_name = MODBUS_FUNCTION_CODES.get(function_code, f"Unknown Function ({function_code})") + exception_name = MODBUS_EXCEPTION_CODES.get(exception_code, f"Unknown Exception ({exception_code})") + description = MODBUS_EXCEPTION_DESCRIPTIONS.get(exception_code, "Unknown exception code") + return f"Modbus Exception: {function_name} failed with {exception_name} - {description}" + else: + # This is not an exception response + function_name = MODBUS_FUNCTION_CODES.get(function_code, f"Unknown Function ({function_code})") + return f"Modbus Function: {function_name} (not an exception response)" + from ..protocol_settings import ( Data_Type, Registry_Type, @@ -26,36 +101,113 @@ from pymodbus.client import BaseModbusClient +@dataclass +class RegisterFailureTracker: + """Tracks register read failures and manages soft disabling""" + register_range: tuple[int, int] # (start, end) register range + registry_type: Registry_Type + failure_count: int = 0 + last_failure_time: float = 0 + last_success_time: float = 0 + disabled_until: float = 0 # Unix timestamp when disabled until + _lock: threading.Lock = None + + def __post_init__(self): + if self._lock is None: + self._lock = threading.Lock() + + def record_failure(self, max_failures: int = 5, disable_duration_hours: int = 12): + """Record a failed read attempt""" + with self._lock: + current_time = time.time() + self.failure_count += 1 + self.last_failure_time = current_time + + # If we've had enough failures, disable for specified duration + if self.failure_count >= max_failures: + self.disabled_until = current_time + (disable_duration_hours * 3600) + return True # Indicates this range should be disabled + return False + + def record_success(self): + """Record a successful read attempt""" + with self._lock: + current_time = time.time() + self.last_success_time = current_time + # Reset failure count on success + self.failure_count = 0 + self.disabled_until = 0 + + def is_disabled(self) -> bool: + """Check if this register range is currently disabled""" + with self._lock: + if self.disabled_until == 0: + return False + return time.time() < self.disabled_until + + def get_remaining_disable_time(self) -> float: + """Get remaining time until re-enabled (0 if not disabled)""" + with self._lock: + if self.disabled_until == 0: + return 0 + remaining = self.disabled_until - time.time() + return max(0, remaining) + + class modbus_base(transport_base): #this is specifically static clients : dict[str, "BaseModbusClient"] = {} ''' str is identifier, dict of clients when multiple transports use the same ports ''' - - #non-static here for reference, type hinting, python bs ect... - modbus_delay_increament : float = 0.05 - ''' delay adjustment every error. todo: add a setting for this ''' - - modbus_delay_setting : float = 0.85 - '''time inbetween requests, unmodified''' - - modbus_delay : float = 0.85 - '''time inbetween requests''' - - analyze_protocol_enabled : bool = False - analyze_protocol_save_load : bool = False - first_connect : bool = True - - send_holding_register : bool = True - send_input_register : bool = True + + # Threading locks for concurrency control + _clients_lock : threading.Lock = threading.Lock() + ''' Lock for accessing the shared clients dictionary ''' + _client_locks : dict[str, threading.Lock] = {} + ''' Port-specific locks to allow concurrent access to different ports ''' def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None): super().__init__(settings) + # Initialize instance-specific variables (not class-level) + self.modbus_delay_increament : float = 0.05 + ''' delay adjustment every error. todo: add a setting for this ''' + + self.modbus_delay_setting : float = 0.85 + '''time inbetween requests, unmodified''' + + self.modbus_delay : float = 0.85 + '''time inbetween requests''' + + self.analyze_protocol_enabled : bool = False + self.analyze_protocol_save_load : bool = False + self.first_connect : bool = True + self._needs_reconnection : bool = False + + self.send_holding_register : bool = True + self.send_input_register : bool = True + + # Register failure tracking - make instance-specific + self.enable_register_failure_tracking: bool = True + self.max_failures_before_disable: int = 5 + self.disable_duration_hours: int = 12 + + # Initialize transport-specific lock + self._transport_lock = threading.Lock() + + # Initialize instance-specific register failure tracking + self.register_failure_trackers: dict[str, RegisterFailureTracker] = {} + self._failure_tracking_lock = threading.Lock() + self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled) self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load) + # Register failure tracking settings + self.enable_register_failure_tracking = settings.getboolean("enable_register_failure_tracking", fallback=self.enable_register_failure_tracking) + self.max_failures_before_disable = settings.getint("max_failures_before_disable", fallback=self.max_failures_before_disable) + self.disable_duration_hours = settings.getint("disable_duration_hours", fallback=self.disable_duration_hours) + # get defaults from protocol settings if "send_input_register" in self.protocolSettings.settings: self.send_input_register = strtobool(self.protocolSettings.settings["send_input_register"]) @@ -74,20 +226,240 @@ def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_setti # Note: Connection and analyze_protocol will be called after subclass initialization is complete - def init_after_connect(self): - #from transport_base settings - if self.write_enabled: - self.enable_write() + def _get_port_identifier(self) -> str: + """Get a unique identifier for this transport's port""" + if hasattr(self, 'port'): + return f"{self.port}_{self.baudrate}" + elif hasattr(self, 'host') and hasattr(self, 'port'): + return f"{self.host}_{self.port}" + else: + return self.transport_name + + def _get_port_lock(self) -> threading.Lock: + """Get or create a lock for this transport's port""" + port_id = self._get_port_identifier() + + with self._clients_lock: + if port_id not in self._client_locks: + self._client_locks[port_id] = threading.Lock() + + return self._client_locks[port_id] + + def _get_register_range_key(self, register_range: tuple[int, int], registry_type: Registry_Type) -> str: + """Generate a unique key for a register range""" + return f"{registry_type.name}_{register_range[0]}_{register_range[1]}" + + def _get_or_create_failure_tracker(self, register_range: tuple[int, int], registry_type: Registry_Type) -> RegisterFailureTracker: + """Get or create a failure tracker for a register range""" + key = self._get_register_range_key(register_range, registry_type) + + with self._failure_tracking_lock: + if key not in self.register_failure_trackers: + self.register_failure_trackers[key] = RegisterFailureTracker( + register_range=register_range, + registry_type=registry_type + ) + + return self.register_failure_trackers[key] + + def _record_register_read_success(self, register_range: tuple[int, int], registry_type: Registry_Type): + """Record a successful register read""" + if not self.enable_register_failure_tracking: + return + + tracker = self._get_or_create_failure_tracker(register_range, registry_type) + # Only log if the last failure was after the last success (i.e., this is the first success after a failure) + should_log_recovery = tracker.last_failure_time > tracker.last_success_time + tracker.record_success() + + if should_log_recovery: + self._log.info(f"Register range {registry_type.name} {register_range[0]}-{register_range[1]} is working again after previous failures") + + def _record_register_read_failure(self, register_range: tuple[int, int], registry_type: Registry_Type) -> bool: + """Record a failed register read, returns True if range should be disabled""" + if not self.enable_register_failure_tracking: + return False + + tracker = self._get_or_create_failure_tracker(register_range, registry_type) + should_disable = tracker.record_failure(self.max_failures_before_disable, self.disable_duration_hours) + + if should_disable: + self._log.warning(f"Register range {registry_type.name} {register_range[0]}-{register_range[1]} disabled for {self.disable_duration_hours} hours after {tracker.failure_count} failures") + else: + self._log.warning(f"Register range {registry_type.name} {register_range[0]}-{register_range[1]} failed ({tracker.failure_count}/{self.max_failures_before_disable} attempts)") + + return should_disable + + def _is_register_range_disabled(self, register_range: tuple[int, int], registry_type: Registry_Type) -> bool: + """Check if a register range is currently disabled""" + if not self.enable_register_failure_tracking: + return False + + tracker = self._get_or_create_failure_tracker(register_range, registry_type) + return tracker.is_disabled() + + def _get_disabled_ranges_info(self) -> list[str]: + """Get information about currently disabled register ranges""" + disabled_info = [] + current_time = time.time() + + with self._failure_tracking_lock: + for tracker in self.register_failure_trackers.values(): + if tracker.is_disabled(): + remaining_hours = tracker.get_remaining_disable_time() / 3600 + disabled_info.append( + f"{tracker.registry_type.name} {tracker.register_range[0]}-{tracker.register_range[1]} " + f"(disabled for {remaining_hours:.1f}h, {tracker.failure_count} failures)" + ) + + return disabled_info + + def get_register_failure_status(self) -> dict: + """Get comprehensive status of register failure tracking""" + status = { + "enabled": self.enable_register_failure_tracking, + "max_failures_before_disable": self.max_failures_before_disable, + "disable_duration_hours": self.disable_duration_hours, + "total_tracked_ranges": 0, + "disabled_ranges": [], + "failed_ranges": [], + "successful_ranges": [] + } + + with self._failure_tracking_lock: + status["total_tracked_ranges"] = len(self.register_failure_trackers) + + for tracker in self.register_failure_trackers.values(): + range_info = { + "registry_type": tracker.registry_type.name, + "range": f"{tracker.register_range[0]}-{tracker.register_range[1]}", + "failure_count": tracker.failure_count, + "last_failure_time": tracker.last_failure_time, + "last_success_time": tracker.last_success_time + } + + if tracker.is_disabled(): + range_info["disabled_until"] = tracker.disabled_until + range_info["remaining_hours"] = tracker.get_remaining_disable_time() / 3600 + status["disabled_ranges"].append(range_info) + elif tracker.failure_count > 0: + status["failed_ranges"].append(range_info) + else: + status["successful_ranges"].append(range_info) + + return status + + def reset_register_failure_tracking(self, registry_type: Registry_Type = None, register_range: tuple[int, int] = None): + """Reset register failure tracking for specific ranges or all ranges""" + with self._failure_tracking_lock: + if registry_type is None and register_range is None: + # Reset all tracking + self.register_failure_trackers.clear() + self._log.info("Reset all register failure tracking") + return + + if register_range is not None: + # Reset specific range + key = self._get_register_range_key(register_range, registry_type or Registry_Type.INPUT) + if key in self.register_failure_trackers: + del self.register_failure_trackers[key] + self._log.info(f"Reset failure tracking for {registry_type.name if registry_type else 'INPUT'} range {register_range[0]}-{register_range[1]}") + else: + # Reset all ranges for specific registry type + keys_to_remove = [] + for key, tracker in self.register_failure_trackers.items(): + if tracker.registry_type == registry_type: + keys_to_remove.append(key) + + for key in keys_to_remove: + del self.register_failure_trackers[key] + + self._log.info(f"Reset failure tracking for all {registry_type.name} ranges ({len(keys_to_remove)} ranges)") + + def enable_register_range(self, register_range: tuple[int, int], registry_type: Registry_Type): + """Manually enable a disabled register range""" + tracker = self._get_or_create_failure_tracker(register_range, registry_type) + with self._failure_tracking_lock: + tracker.disabled_until = 0 + tracker.failure_count = 0 + self._log.info(f"Manually enabled register range {registry_type.name} {register_range[0]}-{register_range[1]}") - #if sn is empty, attempt to autoread it - if not self.device_serial_number: - self.device_serial_number = self.read_serial_number() - self.update_identifier() + def init_after_connect(self): + # Use transport lock to prevent concurrent access during initialization + with self._transport_lock: + #from transport_base settings + if self.write_enabled: + self.enable_write() + + #if sn is empty, attempt to autoread it + if not self.device_serial_number: + self._log.info(f"Reading serial number for transport {self.transport_name} on port {getattr(self, 'port', 'unknown')}") + self.device_serial_number = self.read_serial_number() + self._log.info(f"Transport {self.transport_name} serial number: {self.device_serial_number}") + self.update_identifier() + else: + self._log.debug(f"Transport {self.transport_name} already has serial number: {self.device_serial_number}") def connect(self): - if self.connected and self.first_connect: + """Connect to the Modbus device""" + # Add debugging information + port_info = getattr(self, 'port', 'unknown') + address_info = getattr(self, 'address', 'unknown') + self._log.info(f"Connecting to Modbus device: address={address_info}, port={port_info}") + + # Handle first connection or reconnection + if self.first_connect: self.first_connect = False self.init_after_connect() + elif not self.connected: + # Reconnection case - reinitialize after connection is established + self._log.info(f"Reconnecting transport {self.transport_name}") + # The actual connection is handled by subclasses (e.g., modbus_rtu) + # We just need to reinitialize after connection + self.init_after_connect() + + # Reset reconnection flag after successful connection + if self.connected: + self._needs_reconnection = False + + # Reset protocol settings timestamps to ensure fresh reading + if hasattr(self, 'protocolSettings') and self.protocolSettings: + for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: + if registry_type in self.protocolSettings.registry_map: + for entry in self.protocolSettings.registry_map[registry_type]: + entry.next_read_timestamp = 0 + + def cleanup(self): + """Clean up transport resources and close connections""" + with self._transport_lock: + self._log.info(f"Cleaning up transport {self.transport_name}") + + # Reset register timestamps to prevent sharing issues between transports + if hasattr(self, 'protocolSettings') and self.protocolSettings: + self.protocolSettings.reset_register_timestamps() + + # Close the modbus client connection + port_identifier = self._get_port_identifier() + if port_identifier in self.clients: + try: + client = self.clients[port_identifier] + if hasattr(client, 'close') and callable(client.close): + client.close() + self._log.info(f"Closed modbus client for {self.transport_name}") + except Exception as e: + self._log.warning(f"Error closing modbus client for {self.transport_name}: {e}") + + # Remove from shared clients dict + with self._clients_lock: + if port_identifier in self.clients: + del self.clients[port_identifier] + self._log.info(f"Removed client from shared dict for {self.transport_name}") + + # Mark as disconnected and reset first_connect for reconnection + self.connected = False + self.first_connect = False # Reset so reconnection works properly + self._needs_reconnection = True # Flag that this transport needs reconnection + self._log.info(f"Transport {self.transport_name} cleanup completed") def read_serial_number(self) -> str: # First try to read "Serial Number" from input registers (for protocols like EG4 v58) @@ -165,48 +537,93 @@ def enable_write(self): def write_data(self, data : dict[str, str], from_transport : transport_base) -> None: - if not self.write_enabled: - return + # Use transport lock to prevent concurrent access to this transport instance + with self._transport_lock: + if not self.write_enabled: + return + + registry_map = self.protocolSettings.get_registry_map(Registry_Type.HOLDING) - registry_map = self.protocolSettings.get_registry_map(Registry_Type.HOLDING) + for variable_name, value in data.items(): + entry = None + for e in registry_map: + if e.variable_name == variable_name: + entry = e + break - for key, value in data.items(): - for entry in registry_map: - if entry.variable_name == key: + if entry: self.write_variable(entry, value, Registry_Type.HOLDING) - break - time.sleep(self.modbus_delay) #sleep inbetween requests so modbus can rest + time.sleep(self.modbus_delay) #sleep inbetween requests so modbus can rest def read_data(self) -> dict[str, str]: - info = {} - #modbus - only read input/holding registries - for registry_type in (Registry_Type.INPUT, Registry_Type.HOLDING): - - #enable / disable input/holding register - if registry_type == Registry_Type.INPUT and not self.send_input_register: - continue + # Use transport lock to prevent concurrent access to this transport instance + with self._transport_lock: + # Add debugging information + port_info = getattr(self, 'port', 'unknown') + address_info = getattr(self, 'address', 'unknown') + self._log.debug(f"Reading data from {self.transport_name}: address={address_info}, port={port_info}") + + info = {} + #modbus - only read input/holding registries + for registry_type in (Registry_Type.INPUT, Registry_Type.HOLDING): + + #enable / disable input/holding register + if registry_type == Registry_Type.INPUT and not self.send_input_register: + continue - if registry_type == Registry_Type.HOLDING and not self.send_holding_register: - continue + if registry_type == Registry_Type.HOLDING and not self.send_holding_register: + continue - #calculate ranges dynamically -- for variable read timing - ranges = self.protocolSettings.calculate_registry_ranges(self.protocolSettings.registry_map[registry_type], - self.protocolSettings.registry_map_size[registry_type], - timestamp=self.last_read_time) + #calculate ranges dynamically -- for variable read timing + ranges = self.protocolSettings.calculate_registry_ranges(self.protocolSettings.registry_map[registry_type], + self.protocolSettings.registry_map_size[registry_type], + timestamp=self.last_read_time) + + self._log.info(f"Reading {registry_type.name} registers for {self.transport_name}: {len(ranges)} ranges") + if len(ranges) == 0: + self._log.warning(f"No register ranges calculated for {self.transport_name} {registry_type.name}") + # Debug: show protocol settings info + if hasattr(self, 'protocolSettings') and self.protocolSettings: + total_entries = len(self.protocolSettings.registry_map.get(registry_type, [])) + self._log.info(f"Protocol settings for {self.transport_name}: {total_entries} total entries for {registry_type.name}") + + # Count entries that would be read + readable_entries = 0 + for entry in self.protocolSettings.registry_map.get(registry_type, []): + if entry.write_mode != WriteMode.READDISABLED and entry.write_mode != WriteMode.WRITEONLY: + readable_entries += 1 + self._log.info(f"Readable entries for {self.transport_name} {registry_type.name}: {readable_entries}") + + registry = self.read_modbus_registers(ranges=ranges, registry_type=registry_type) + + if registry: + self._log.info(f"Got registry data for {self.transport_name} {registry_type.name}: {len(registry)} registers") + else: + self._log.warning(f"No registry data returned for {self.transport_name} {registry_type.name}") + + new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(registry_type)) - registry = self.read_modbus_registers(ranges=ranges, registry_type=registry_type) - new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(registry_type)) + if False: + new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()} - if False: - new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()} + info.update(new_info) - info.update(new_info) + if not info: + self._log.info("Register is Empty; transport busy?") - if not info: - self._log.info("Register is Empty; transport busy?") + # Log disabled ranges status periodically (every 10 minutes) + if self.enable_register_failure_tracking and hasattr(self, '_last_disabled_status_log') and time.time() - self._last_disabled_status_log > 600: + disabled_ranges = self._get_disabled_ranges_info() + if disabled_ranges: + self._log.info(f"Currently disabled register ranges: {len(disabled_ranges)}") + for range_info in disabled_ranges: + self._log.info(f" - {range_info}") + self._last_disabled_status_log = time.time() + elif not hasattr(self, '_last_disabled_status_log'): + self._last_disabled_status_log = time.time() - return info + return info def validate_protocol(self, protocolSettings : "protocol_settings") -> float: score_percent = self.validate_registry(Registry_Type.HOLDING) @@ -555,6 +972,12 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en index = -1 while (index := index + 1) < len(ranges) : range = ranges[index] + + # Check if this register range is currently disabled + if self._is_register_range_disabled(range, registry_type): + remaining_hours = self._get_or_create_failure_tracker(range, registry_type).get_remaining_disable_time() / 3600 + self._log.info(f"Skipping disabled register range {registry_type.name} {range[0]}-{range[0]+range[1]-1} (disabled for {remaining_hours:.1f}h)") + continue self._log.info("get registers ("+str(index)+"): " +str(registry_type)+ " - " + str(range[0]) + " to " + str(range[0]+range[1]-1) + " ("+str(range[1])+")") time.sleep(self.modbus_delay) #sleep for 1ms to give bus a rest #manual recommends 1s between commands @@ -565,7 +988,7 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en register = self.read_registers(range[0], range[1], registry_type=registry_type) except ModbusIOException as e: - self._log.error("ModbusIOException: " + str(e)) + self._log.error(f"ModbusIOException for {self.transport_name}: " + str(e)) # In pymodbus 3.7+, ModbusIOException doesn't have error_code attribute # Treat all ModbusIOException as retryable errors isError = True @@ -577,7 +1000,20 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en elif isinstance(register, bytes): self._log.error(register.decode("utf-8")) else: - self._log.error(str(register)) + # Enhanced error logging with Modbus exception interpretation + error_msg = str(register) + + # Check if this is an ExceptionResponse and extract the exception code + if hasattr(register, 'function_code') and hasattr(register, 'exception_code'): + exception_code = register.function_code | 0x80 # Convert to exception response code + interpreted_error = interpret_modbus_exception_code(exception_code) + self._log.debug(f"{error_msg} - {interpreted_error}") + else: + self._log.error(error_msg) + + # Record the failure for this register range + should_disable = self._record_register_read_failure(range, registry_type) + self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy if self.modbus_delay > 60: #max delay. 60 seconds between requests should be way over kill if it happens @@ -597,6 +1033,8 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en if self.modbus_delay < self.modbus_delay_setting: self.modbus_delay = self.modbus_delay_setting + # Record successful read for this register range + self._record_register_read_success(range, registry_type) retry -= 1 if retry < 0: diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index c3d424a..f4b57c1 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -54,11 +54,13 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings client_str = self.port+"("+str(self.baudrate)+")" - if client_str in modbus_base.clients: - self.client = modbus_base.clients[client_str] - return + # Thread-safe client access + with self._clients_lock: + if client_str in modbus_base.clients: + self.client = modbus_base.clients[client_str] + return - self._log.debug(f"Creating new client with baud rate: {self.baudrate}") + self._log.debug(f"Creating new client with baud rate: {self.baudrate}") if "method" in init_signature.parameters: self.client = ModbusSerialClient(method="rtu", port=self.port, @@ -72,8 +74,9 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings stopbits=1, parity="N", bytesize=8, timeout=2 ) - #add to clients - modbus_base.clients[client_str] = self.client + #add to clients (thread-safe) + with self._clients_lock: + modbus_base.clients[client_str] = self.client def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): @@ -84,10 +87,13 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr if self.pymodbus_slave_arg != "unit": kwargs["slave"] = kwargs.pop("unit") - if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(address=start, count=count, **kwargs) - elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(address=start, count=count, **kwargs) + # Use port-specific lock for thread-safe access + port_lock = self._get_port_lock() + with port_lock: + if registry_type == Registry_Type.INPUT: + return self.client.read_input_registers(address=start, count=count, **kwargs) + elif registry_type == Registry_Type.HOLDING: + return self.client.read_holding_registers(address=start, count=count, **kwargs) def write_register(self, register : int, value : int, **kwargs): if not self.write_enabled: @@ -100,9 +106,14 @@ def write_register(self, register : int, value : int, **kwargs): if self.pymodbus_slave_arg != "unit": kwargs["slave"] = kwargs.pop("unit") - self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register + # Use port-specific lock for thread-safe access + port_lock = self._get_port_lock() + with port_lock: + self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register def connect(self): self.connected = self.client.connect() - self._log.debug(f"Modbus rtu connected: {self.connected}") + self._log.info(f"Modbus rtu connected: {self.connected} for {self.transport_name} on port {self.port}") + if not self.connected: + self._log.error(f"Failed to connect to {self.transport_name} on port {self.port}") super().connect() diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index 26dc9a8..a2026e0 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -32,14 +32,17 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings client_str = self.host+"("+str(self.port)+")" #check if client is already initialied - if client_str in modbus_base.clients: - self.client = modbus_base.clients[client_str] - return + with self._clients_lock: + if client_str in modbus_base.clients: + self.client = modbus_base.clients[client_str] + super().__init__(settings, protocolSettings=protocolSettings) + return self.client = ModbusTcpClient(host=self.host, port=self.port, timeout=7, retries=3) - #add to clients - modbus_base.clients[client_str] = self.client + #add to clients (thread-safe) + with self._clients_lock: + modbus_base.clients[client_str] = self.client super().__init__(settings, protocolSettings=protocolSettings) @@ -54,7 +57,10 @@ def write_register(self, register : int, value : int, **kwargs): if self.pymodbus_slave_arg != "unit": kwargs["slave"] = kwargs.pop("unit") - self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register + # Use port-specific lock for thread-safe access + port_lock = self._get_port_lock() + with port_lock: + self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): @@ -65,10 +71,13 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr if self.pymodbus_slave_arg != "unit": kwargs["slave"] = kwargs.pop("unit") - if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(start,count=count, **kwargs ) - elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(start,count=count, **kwargs) + # Use port-specific lock for thread-safe access + port_lock = self._get_port_lock() + with port_lock: + if registry_type == Registry_Type.INPUT: + return self.client.read_input_registers(start,count=count, **kwargs ) + elif registry_type == Registry_Type.HOLDING: + return self.client.read_holding_registers(start,count=count, **kwargs) def connect(self): self.connected = self.client.connect() diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index aa9d35f..54229e3 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -1,4 +1,5 @@ import logging +import copy from enum import Enum from typing import TYPE_CHECKING, Callable @@ -73,6 +74,7 @@ class transport_base: last_read_time : float = 0 connected : bool = False + _needs_reconnection : bool = False on_message : Callable[["transport_base", registry_map_entry, str], None] = None ''' callback, on message recieved ''' @@ -112,7 +114,12 @@ def __init__(self, settings : "SectionProxy") -> None: #must load after settings self.protocol_version = settings.get("protocol_version") if self.protocol_version: - self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) + # Create a deep copy of protocol settings to avoid shared state between transports + original_protocol_settings = protocol_settings(self.protocol_version, transport_settings=settings) + self.protocolSettings = copy.deepcopy(original_protocol_settings) + + # Update the transport settings reference in the copy + self.protocolSettings.transport_settings = settings if self.protocolSettings: self.protocol_version = self.protocolSettings.protocol @@ -138,6 +145,14 @@ def _get_top_class_name(cls, cls_obj): def connect(self): pass + def cleanup(self): + """Clean up transport resources and close connections""" + self._log.debug(f"Cleaning up transport {self.transport_name}") + # Base implementation - subclasses should override if needed + # Mark that this transport needs reconnection + self._needs_reconnection = True + pass + def write_data(self, data : dict[str, registry_map_entry], from_transport : "transport_base"): ''' general purpose write function for between transports''' pass diff --git a/config.cfg.example b/config.cfg.example index d498bdf..53e166e 100644 --- a/config.cfg.example +++ b/config.cfg.example @@ -2,6 +2,10 @@ # Global logging level (DEBUG, INFO, WARNING, ERROR) log_level = DEBUG +# Disable concurrent transport reads (true = sequential, false = concurrent) +# When true, transports will read sequentially instead of concurrently +disable_concurrency = true + [transport.0] #name must be unique, ie: transport.modbus # Logging level specific to this transport log_level = DEBUG diff --git a/documentation/register_failure_tracking.md b/documentation/register_failure_tracking.md new file mode 100644 index 0000000..9888212 --- /dev/null +++ b/documentation/register_failure_tracking.md @@ -0,0 +1,161 @@ +# Register Failure Tracking + +## Overview + +The register failure tracking system automatically detects and soft-disables problematic register ranges that consistently fail to read. This helps improve system reliability by avoiding repeated attempts to read from registers that are known to be problematic. + +## How It Works + +1. **Failure Detection**: The system tracks failed read attempts for each register range +2. **Automatic Disabling**: After 5 failed attempts, a register range is automatically disabled for 12 hours +3. **Recovery**: Successful reads reset the failure count and re-enable the range +4. **Periodic Logging**: Disabled ranges are logged every 10 minutes for monitoring + +## Configuration + +Add these settings to your transport configuration: + +```ini +[transport.0] +# Enable/disable register failure tracking (default: true) +enable_register_failure_tracking = true + +# Number of failures before disabling a range (default: 5) +max_failures_before_disable = 5 + +# Duration to disable ranges in hours (default: 12) +disable_duration_hours = 12 +``` + +## Example Configuration + +```ini +[transport.0] +type = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = eg4_v58 +enable_register_failure_tracking = true +max_failures_before_disable = 3 +disable_duration_hours = 6 +``` + +## Log Messages + +### Failure Tracking +``` +WARNING: Register range INPUT 994-999 failed (1/5 attempts) +WARNING: Register range INPUT 994-999 failed (2/5 attempts) +WARNING: Register range INPUT 994-999 failed (3/5 attempts) +WARNING: Register range INPUT 994-999 failed (4/5 attempts) +WARNING: Register range INPUT 994-999 disabled for 12 hours after 5 failures +``` + +### Skipping Disabled Ranges +``` +INFO: Skipping disabled register range INPUT 994-999 (disabled for 11.5h) +``` + +### Recovery +``` +INFO: Register range INPUT 994-999 is working again after previous failures +``` + +### Periodic Status +``` +INFO: Currently disabled register ranges: 2 +INFO: - INPUT 994-999 (disabled for 8.2h, 5 failures) +INFO: - HOLDING 1000-1011 (disabled for 3.1h, 5 failures) +``` + +## API Methods + +### Get Status +```python +status = transport.get_register_failure_status() +print(f"Disabled ranges: {len(status['disabled_ranges'])}") +``` + +### Manual Control +```python +# Enable a specific range +transport.enable_register_range((994, 6), Registry_Type.INPUT) + +# Reset all tracking +transport.reset_register_failure_tracking() + +# Reset specific registry type +transport.reset_register_failure_tracking(Registry_Type.INPUT) + +# Reset specific range +transport.reset_register_failure_tracking(Registry_Type.INPUT, (994, 6)) +``` + +## Status Information + +The `get_register_failure_status()` method returns a dictionary with: + +- `enabled`: Whether failure tracking is enabled +- `max_failures_before_disable`: Configured failure threshold +- `disable_duration_hours`: Configured disable duration +- `total_tracked_ranges`: Total number of ranges being tracked +- `disabled_ranges`: List of currently disabled ranges +- `failed_ranges`: List of ranges with failures but not yet disabled +- `successful_ranges`: List of ranges with no failures + +Each range entry contains: +- `registry_type`: INPUT or HOLDING +- `range`: Register range (e.g., "994-999") +- `failure_count`: Number of failures +- `last_failure_time`: Timestamp of last failure +- `last_success_time`: Timestamp of last success +- `disabled_until`: Timestamp when disabled until (for disabled ranges) +- `remaining_hours`: Hours remaining until re-enabled (for disabled ranges) + +## Use Cases + +### Problematic Hardware +When certain register ranges consistently fail due to hardware issues, the system will automatically stop trying to read them, reducing error logs and improving performance. + +### Protocol Mismatches +If a device doesn't support certain register ranges, the system will learn to avoid them rather than repeatedly attempting to read them. + +### Network Issues +For network-based Modbus (TCP), temporary network issues can cause register read failures. The system will temporarily disable affected ranges until the network stabilizes. + +## Best Practices + +1. **Monitor Logs**: Check for disabled ranges in your logs to identify hardware or configuration issues +2. **Adjust Thresholds**: Consider lowering `max_failures_before_disable` for more aggressive disabling +3. **Review Disabled Ranges**: Use the status API to periodically review which ranges are disabled +4. **Manual Intervention**: Use `enable_register_range()` to manually re-enable ranges if you know the issue is resolved + +## Troubleshooting + +### Range Stuck Disabled +If a range remains disabled longer than expected: +```python +# Check the status +status = transport.get_register_failure_status() +for disabled in status['disabled_ranges']: + print(f"{disabled['range']}: {disabled['remaining_hours']:.1f}h remaining") + +# Manually enable if needed +transport.enable_register_range((994, 6), Registry_Type.INPUT) +``` + +### Too Many Failures +If ranges are being disabled too quickly: +```ini +# Increase the failure threshold +max_failures_before_disable = 10 + +# Increase the disable duration +disable_duration_hours = 24 +``` + +### Disable Tracking +To completely disable the feature: +```ini +enable_register_failure_tracking = false +``` \ No newline at end of file diff --git a/protocol_gateway.py b/protocol_gateway.py index c2f6a3a..0e56f94 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -7,6 +7,7 @@ import importlib import sys import time +import threading # Check if Python version is greater than 3.9 if sys.version_info < (3, 9): @@ -79,7 +80,13 @@ def get(self, section, option, *args, **kwargs): if isinstance(value, float): return value - return value.strip() if value is not None else value + if value is not None: + # Strip leading/trailing whitespace and inline comments + value = value.strip() + # Remove inline comments (everything after #) + if '#' in value: + value = value.split('#')[0].strip() + return value def getint(self, section, option, *args, **kwargs): #bypass fallback bug value = self.get(section, option, *args, **kwargs) @@ -89,6 +96,22 @@ def getfloat(self, section, option, *args, **kwargs): #bypass fallback bug value = self.get(section, option, *args, **kwargs) return float(value) if value is not None else None + def getboolean(self, section, option, *args, **kwargs): #bypass fallback bug and handle case-insensitive boolean values + value = self.get(section, option, *args, **kwargs) + if value is None: + return None + + # Convert to string and handle case-insensitive boolean values + value_str = str(value).lower().strip() + + # Handle various boolean representations + if value_str in ('true', 'yes', 'on', '1', 'enable', 'enabled'): + return True + elif value_str in ('false', 'no', 'off', '0', 'disable', 'disabled'): + return False + else: + raise ValueError(f'Not a boolean: {value}') + class Protocol_Gateway: """ @@ -105,6 +128,25 @@ class Protocol_Gateway: ''' transport_base is for type hinting. this can be any transport''' config_file : str + + # Simple read completion tracking + __read_completion_tracker : dict[str, bool] = {} + ''' Track which transports have completed their current read cycle ''' + __read_tracker_lock : threading.Lock = None + + # Concurrency control + __disable_concurrency : bool = False + ''' When true, transports read sequentially instead of concurrently. + Concurrent mode (false) is recommended for multiple devices with same address + as it prevents timing interference between rapid sequential reads. ''' + + # Transport timing control + __transport_delay_offset : float = 0.5 + ''' Additional delay between different transports to prevent conflicts ''' + + # Sequential transport delay + __sequential_delay : float = 1.0 + ''' Delay between sequential transport reads to prevent device confusion ''' def __init__(self, config_file : str): self.__log = logging.getLogger("invertermodbustomqqt_log") @@ -131,6 +173,15 @@ def __init__(self, config_file : str): ##[general] self.__log_level = self.__settings.get("general","log_level", fallback="INFO") + + # Read concurrency setting - default to sequential (disabled) for better stability + self.__disable_concurrency = self.__settings.getboolean("general", "disable_concurrency", fallback=True) + self.__log.info(f"Concurrency mode: {'Sequential' if self.__disable_concurrency else 'Concurrent'}") + + # Read sequential delay setting + self.__sequential_delay = self.__settings.getfloat("general", "sequential_delay", fallback=1.0) + if self.__disable_concurrency: + self.__log.info(f"Sequential delay between transports: {self.__sequential_delay} seconds") log_level = getattr(logging, self.__log_level, logging.INFO) self.__log.setLevel(log_level) @@ -178,7 +229,12 @@ def __init__(self, config_file : str): if to_transport.bridge == from_transport.transport_name: to_transport.init_bridge(from_transport) from_transport.init_bridge(to_transport) - + + # Initialize read completion tracking + self.__read_tracker_lock = threading.Lock() + for transport in self.__transports: + if transport.read_interval > 0: + self.__read_completion_tracker[transport.transport_name] = False def on_message(self, transport : transport_base, entry : registry_map_entry, data : str): ''' message recieved from a transport! ''' @@ -188,6 +244,55 @@ def on_message(self, transport : transport_base, entry : registry_map_entry, dat to_transport.write_data({entry.variable_name : data}, transport) break + def _process_transport_read(self, transport): + """Process a single transport read operation""" + try: + # Always ensure transport is connected before reading + if not transport.connected: + self.__log.info(f"Transport {transport.transport_name} not connected, connecting...") + transport.connect() + + self.__log.debug(f"Starting read cycle for {transport.transport_name}") + info = transport.read_data() + + if not info: + self.__log.warning(f"Transport {transport.transport_name} completed read cycle with NO DATA - this may indicate a device issue") + self._mark_read_complete(transport) + return + + self.__log.debug(f"Transport {transport.transport_name} completed read cycle with {len(info)} fields") + + # Write to output transports immediately (as before) + if transport.bridge: + for to_transport in self.__transports: + if to_transport.transport_name == transport.bridge: + to_transport.write_data(info, transport) + break + + self._mark_read_complete(transport) + + except Exception as err: + self.__log.error(f"Error processing transport {transport.transport_name}: {err}") + traceback.print_exc() + self._mark_read_complete(transport) + + def _mark_read_complete(self, transport): + """Mark a transport as having completed its read cycle""" + with self.__read_tracker_lock: + self.__read_completion_tracker[transport.transport_name] = True + self.__log.debug(f"Marked {transport.transport_name} read cycle as complete") + + def _reset_read_completion_tracker(self): + """Reset the read completion tracker for the next cycle""" + with self.__read_tracker_lock: + for transport_name in self.__read_completion_tracker: + self.__read_completion_tracker[transport_name] = False + + def _get_read_completion_status(self): + """Get the current read completion status for debugging""" + with self.__read_tracker_lock: + return self.__read_completion_tracker.copy() + def run(self): """ run method, starts ModBus connection and mqtt connection @@ -201,25 +306,60 @@ def run(self): while self.__running: try: now = time.time() + ready_transports = [] + + # Find all transports that are ready to read for transport in self.__transports: - if transport.read_interval > 0 and now - transport.last_read_time > transport.read_interval: + if transport.read_interval > 0 and now - transport.last_read_time > transport.read_interval: transport.last_read_time = now - #preform read - if not transport.connected: - transport.connect() #reconnect - else: #transport is connected - - info = transport.read_data() - - if not info: - continue - - #todo. broadcast option - if transport.bridge: - for to_transport in self.__transports: - if to_transport.transport_name == transport.bridge: - to_transport.write_data(info, transport) - break + ready_transports.append(transport) + + # Reset read completion tracker for this cycle + if ready_transports: + self._reset_read_completion_tracker() + self.__log.debug(f"Starting read cycle for {len(ready_transports)} transports: {[t.transport_name for t in ready_transports]}") + + # Process transports based on concurrency setting + if self.__disable_concurrency: + # Sequential processing - process transports one by one + for i, transport in enumerate(ready_transports): + self.__log.debug(f"Processing {transport.transport_name} sequentially ({i+1}/{len(ready_transports)})") + + # Process current transport + self._process_transport_read(transport) + + # Add delay between transports to prevent device confusion + if i < len(ready_transports) - 1: # Don't delay after the last transport + self.__log.debug(f"Waiting {self.__sequential_delay} seconds before next transport...") + time.sleep(self.__sequential_delay) + + # Log completion status for sequential mode + completion_status = self._get_read_completion_status() + completed = [name for name, status in completion_status.items() if status] + self.__log.debug(f"Sequential read cycle completed. Completed transports: {completed}") + + else: + # Concurrent processing - process transports in parallel + if len(ready_transports) > 1: + threads = [] + for transport in ready_transports: + thread = threading.Thread(target=self._process_transport_read, args=(transport,)) + thread.daemon = True + thread.start() + threads.append(thread) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Log completion status + completion_status = self._get_read_completion_status() + completed = [name for name, status in completion_status.items() if status] + self.__log.debug(f"Concurrent read cycle completed. Completed transports: {completed}") + + elif len(ready_transports) == 1: + # Single transport - process directly + self._process_transport_read(ready_transports[0]) except Exception as err: traceback.print_exc()