diff --git a/custom_components/resol/config_flow.py b/custom_components/resol/config_flow.py index 27e064d..8d994cd 100644 --- a/custom_components/resol/config_flow.py +++ b/custom_components/resol/config_flow.py @@ -1,4 +1,5 @@ """config_flow.py: Config flow for Resol integration.""" + from __future__ import annotations import logging @@ -15,23 +16,15 @@ from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import IntegrationError -from .const import ( - _LOGGER, - DOMAIN, - ISSUE_URL_ERROR_MESSAGE -) +from .const import _LOGGER, DOMAIN, ISSUE_URL_ERROR_MESSAGE -from .resolapi import ( - ResolAPI, - AuthenticationFailed -) +from .resolapi import ResolAPI, AuthenticationFailed # This is the first step's schema when setting up the integration, or its devices # The second schema is defined inside the ConfigFlow class as it has dynamice default values set via API call STEP_USER_DATA_SCHEMA = vol.Schema( { - vol.Required("host", default=""): str, vol.Required("port", default="80"): str, vol.Required("username", default="admin"): str, @@ -40,8 +33,9 @@ ) - -async def validate_input_for_device(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]: +async def validate_input_for_device( + hass: HomeAssistant, data: dict[str, Any] +) -> dict[str, Any]: """Validate the user input allows us to connect.""" resol_api = ResolAPI(data["host"], data["port"], data["username"], data["password"]) @@ -51,7 +45,14 @@ async def validate_input_for_device(hass: HomeAssistant, data: dict[str, Any]) - device = await hass.async_add_executor_job(resol_api.detect_device) # Additionally, check for authentication by calling fetch_data_km2 - auth_check = await hass.async_add_executor_job(resol_api.fetch_data_km2) + # As requested here: https://github.com/evercape/hass-resol-KM2/issues/3 + if device["product"] == "KM2": + auth_check = await hass.async_add_executor_job(resol_api.fetch_data_km2) + elif device["product"] == "KM1": + auth_check = await hass.async_add_executor_job(resol_api.fetch_data_km1) + elif device["product"] == "DL2" or device["product"] == "DL3": + auth_check = await hass.async_add_executor_job(resol_api.fetch_data_dlx) + if not auth_check: # If authentication check returns False, raise an authentication failure exception raise AuthenticationFailed("Invalid authentication") @@ -61,16 +62,17 @@ async def validate_input_for_device(hass: HomeAssistant, data: dict[str, Any]) - # Exception if device cannot be found except IntegrationError as e: - _LOGGER.error(f"Failed to connect to Resol device: {e}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error( + f"Failed to connect to Resol device: {e}" + ISSUE_URL_ERROR_MESSAGE + ) raise CannotConnect from e # Exception if authentication fails except AuthenticationFailed as e: - _LOGGER.error(f"Authentication failed: {e}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error(f"Authentication failed: {e}" + ISSUE_URL_ERROR_MESSAGE) raise InvalidAuth from e - class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow for Resol.""" @@ -108,7 +110,9 @@ async def async_step_user( # Checks that the device is actually unique, otherwise abort await self.async_set_unique_id(unique_id) - self._abort_if_unique_id_configured(updates={"host": user_input["host"]}) + self._abort_if_unique_id_configured( + updates={"host": user_input["host"]} + ) # Before creating the entry in the config_entry registry, go to step 2 for the options # However, make sure the steps from the user input are passed on to the next step @@ -125,7 +129,8 @@ async def async_step_user( # This is step 2 for the options such as custom name, group and disable sensors async def async_step_device_options( - self, user_input: dict[str, Any] | None = None, + self, + user_input: dict[str, Any] | None = None, ) -> FlowResult: """Handle the device options step.""" @@ -134,7 +139,9 @@ async def async_step_device_options( if user_input is not None: try: # Sanitize the user provided custom device name, which is used for entry and device registry name - user_input["custom_device_name"] = sanitize_device_name(user_input["custom_device_name"], self.device_info["name"]) + user_input["custom_device_name"] = sanitize_device_name( + user_input["custom_device_name"], self.device_info["name"] + ) # Since we have already set the unique ID and updated host if necessary create the entry with the additional options. # The title of the integration is the custom friendly device name given by the user in step 2 @@ -142,24 +149,34 @@ async def async_step_device_options( return self.async_create_entry( title=title, data={ - "user_input": self.user_input_from_step_user, # from previous step - "device_info": self.device_info, # from device detection - "options": user_input # new options from this step + "user_input": self.user_input_from_step_user, # from previous step + "device_info": self.device_info, # from device detection + "options": user_input, # new options from this step }, ) except Exception as e: - _LOGGER.error(f"Failed to handle device options: {e}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error( + f"Failed to handle device options: {e}" + ISSUE_URL_ERROR_MESSAGE + ) errors["base"] = "option_error" # Prepare the second form's schema as it has dynamic values based on the API call # Use the name from the detected device as default device name - default_device_name = self.device_info["name"] if self.device_info and "name" in self.device_info else "New Device" - step_device_options_schema = vol.Schema({ - vol.Required("custom_device_name", default=default_device_name): str, - vol.Required("polling_time", default=60): vol.All(vol.Coerce(int), vol.Clamp(min=60)), - vol.Required("group_sensors", default=True): bool, - vol.Required("disable_sensors", default=True): bool, - }) + default_device_name = ( + self.device_info["name"] + if self.device_info and "name" in self.device_info + else "New Device" + ) + step_device_options_schema = vol.Schema( + { + vol.Required("custom_device_name", default=default_device_name): str, + vol.Required("polling_time", default=60): vol.All( + vol.Coerce(int), vol.Clamp(min=60) + ), + vol.Required("group_sensors", default=True): bool, + vol.Required("disable_sensors", default=True): bool, + } + ) # Show the form for step 2 with the device name and other options as defined in STEP_DEVICE_OPTIONS_SCHEMA return self.async_show_form( @@ -169,8 +186,6 @@ async def async_step_device_options( ) - - class CannotConnect(HomeAssistantError): """Error to indicate we cannot connect.""" @@ -179,23 +194,25 @@ class InvalidAuth(HomeAssistantError): """Error to indicate there is invalid auth.""" -#Helper function to sanitize +# Helper function to sanitize def sanitize_device_name(device_name: str, fall_back: str, max_length=255) -> str: # Trim whitespace name = device_name.strip() # Remove special characters but keep spaces - name = re.sub(r'[^\w\s-]', '', name) + name = re.sub(r"[^\w\s-]", "", name) # Replace multiple spaces with a single space - name = re.sub(r'\s+', ' ', name) + name = re.sub(r"\s+", " ", name) # Length check if len(name) > max_length: - name = name[:max_length].rsplit(' ', 1)[0] # Split at the last space to avoid cutting off in the middle of a word + name = name[:max_length].rsplit(" ", 1)[ + 0 + ] # Split at the last space to avoid cutting off in the middle of a word # Fallback name if not name: name = fall_back - return name \ No newline at end of file + return name diff --git a/custom_components/resol/resolapi.py b/custom_components/resol/resolapi.py index bbb5149..fedc0d8 100644 --- a/custom_components/resol/resolapi.py +++ b/custom_components/resol/resolapi.py @@ -8,14 +8,15 @@ from homeassistant.exceptions import IntegrationError from requests.exceptions import RequestException, Timeout -from .const import ( - _LOGGER, - ISSUE_URL_ERROR_MESSAGE -) +from .const import _LOGGER, ISSUE_URL_ERROR_MESSAGE # Better storage of Resol endpoint -ResolEndPoint = namedtuple('ResolEndPoint', 'internal_unique_id, serial, name, friendly_name, value, unit, description, destination, source') +ResolEndPoint = namedtuple( + "ResolEndPoint", + "internal_unique_id, serial, name, friendly_name, value, unit, description, destination, source", +) + # ResolAPI to detect device and get device info, fetch the actual data from the Resol device, and parse it class ResolAPI: @@ -28,111 +29,157 @@ def __init__(self, host, port, username, password): self.session = requests.Session() def detect_device(self): - try: url = f"http://{self.host}:{self.port}/cgi-bin/get_resol_device_information" response = requests.request("GET", url, timeout=5) _LOGGER.debug(f"Attempting to discover Resol device via get: {url}") - if(response.status_code == 200): + if response.status_code == 200: matches = re.search(r'product\s=\s["](.*?)["]', response.text) if matches: self.device = { - 'product': matches.group(1), - 'vendor': re.search(r'vendor\s=\s["](.*?)["]', response.text).group(1), - 'serial': re.search(r'serial\s=\s["](.*?)["]', response.text).group(1), - 'version': re.search(r'version\s=\s["](.*?)["]', response.text).group(1), - 'build': re.search(r'build\s=\s["](.*?)["]', response.text).group(1), - 'name': re.search(r'name\s=\s["](.*?)["]', response.text).group(1), - 'features': re.search(r'features\s=\s["](.*?)["]', response.text).group(1), - 'host': self.host, - 'port': self.port, - 'mac': self.format_serial_to_mac(re.search(r'serial\s=\s["](.*?)["]', response.text).group(1)) #convert serial to MAC address + "product": matches.group(1), + "vendor": re.search( + r'vendor\s=\s["](.*?)["]', response.text + ).group(1), + "serial": re.search( + r'serial\s=\s["](.*?)["]', response.text + ).group(1), + "version": re.search( + r'version\s=\s["](.*?)["]', response.text + ).group(1), + "build": re.search( + r'build\s=\s["](.*?)["]', response.text + ).group(1), + "name": re.search(r'name\s=\s["](.*?)["]', response.text).group( + 1 + ), + "features": re.search( + r'features\s=\s["](.*?)["]', response.text + ).group(1), + "host": self.host, + "port": self.port, + "mac": self.format_serial_to_mac( + re.search(r'serial\s=\s["](.*?)["]', response.text).group(1) + ), # convert serial to MAC address } - _LOGGER.debug(f"{self.device['serial']}: Resol device data received: {self.device}") + _LOGGER.debug( + f"{self.device['serial']}: Resol device data received: {self.device}" + ) else: error = f"{self.device['serial']}: Your device was reachable at {url } but could not be successfully detected." - _LOGGER.warning(error+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning(error + ISSUE_URL_ERROR_MESSAGE) raise IntegrationError(error) else: error = f"Are you sure you entered the correct address {url} of the Resol KM2 device?" - _LOGGER.warning(error+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning(error + ISSUE_URL_ERROR_MESSAGE) raise IntegrationError(error) except ConnectionError: - _LOGGER.warning(f"Unable to connect to {self.host}. Device might be offline."+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"Unable to connect to {self.host}. Device might be offline." + + ISSUE_URL_ERROR_MESSAGE + ) raise IntegrationError(error) return None except RequestException as e: error = f"Error detecting Resol device - {e}" - _LOGGER.error(f"Error detecting Resol device - {e}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error( + f"Error detecting Resol device - {e}" + ISSUE_URL_ERROR_MESSAGE + ) raise IntegrationError(error) return None return self.device - # Fetch the data from the Resol KM2 device, which then constitues the Sensors def fetch_data_km2(self): response = {} url = f"http://{self.host}:{self.port}/cgi-bin/resol-webservice" - _LOGGER.debug(f"{self.device['serial']}: KM2 requesting sensor data from url {url}") + _LOGGER.debug( + f"{self.device['serial']}: KM2 requesting sensor data from url {url}" + ) try: - headers = { - 'Content-Type': 'application/json' - } - - payload = "[{'id': '1','jsonrpc': '2.0','method': 'login','params': {'username': '" + self.username + "','password': '" + self.password + "'}}]" - response = requests.request("POST", url, headers=headers, data = payload) + headers = {"Content-Type": "application/json"} + + payload = ( + "[{'id': '1','jsonrpc': '2.0','method': 'login','params': {'username': '" + + self.username + + "','password': '" + + self.password + + "'}}]" + ) + response = requests.request("POST", url, headers=headers, data=payload) response_auth = response.json()[0] # Check if correct login credentials # BAD RESPONSE: [{'jsonrpc': '2.0', 'id': '1', 'error': {'category': 'App', 'message': 'Invalid credentials'}}] - if 'error' in response_auth: - error_message = response_auth['error'].get('message') - _LOGGER.warning(f"{self.device['serial']}: Authentication failed: {error_message}") + if "error" in response_auth: + error_message = response_auth["error"].get("message") + _LOGGER.warning( + f"{self.device['serial']}: Authentication failed: {error_message}" + ) raise AuthenticationFailed(error_message) # GOOD RESPONSE: [{'jsonrpc': '2.0', 'id': '1', 'result': {'authId': 'a463cc3ab307c7ef7d85f22daf15f0'}}] - elif 'result' in response_auth and 'authId' in response_auth['result']: - authId = response_auth['result']['authId'] - _LOGGER.debug(f"{self.device['serial']}: Successfully authenticated. Auth ID: {authId}") + elif "result" in response_auth and "authId" in response_auth["result"]: + authId = response_auth["result"]["authId"] + _LOGGER.debug( + f"{self.device['serial']}: Successfully authenticated. Auth ID: {authId}" + ) # Authenticate and get the actual sensors response data - payload = "[{'id': '1','jsonrpc': '2.0','method': 'dataGetCurrentData','params': {'authId': '" + authId + "'}}]" - response = requests.request("POST", url, headers=headers, data = payload) + payload = ( + "[{'id': '1','jsonrpc': '2.0','method': 'dataGetCurrentData','params': {'authId': '" + + authId + + "'}}]" + ) + response = requests.request("POST", url, headers=headers, data=payload) response_data = response.json()[0]["result"] _LOGGER.debug(f"{self.device['serial']}: KM2 response: {response_data}") # Proceed to parsing return self.__parse_data(response_data) - except KeyError: error = f"{self.device['serial']}: Please re-check your username and password in your configuration!" - _LOGGER.warning(error+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning(error + ISSUE_URL_ERROR_MESSAGE) raise IntegrationError(error) return None except ConnectionError: error = f"ConnectionError in fetch_data_km2(): Unable to connect to {self.host}. Device might be offline." - _LOGGER.warning(error+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning(error + ISSUE_URL_ERROR_MESSAGE) raise IntegrationError(error) return None except RequestException as e: error = f"RequestException in fetch_data_km2(): Error while fetching data from {self.host}: {e}" - _LOGGER.warning(error+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning(error + ISSUE_URL_ERROR_MESSAGE) raise IntegrationError(error) return None + # Fetch the data from the Resol DL2 device, which then constitues the Sensors + # As requested here: https://github.com/evercape/hass-resol-KM2/issues/3 + def fetch_data_dlx(self): + response = {} + + url = f"http://{self.host}:{self.port}/dlx/download/live" + _LOGGER.debug(f"{self.device['serial']}: DLX requesting sensor data from url {url}") + + response = requests.request("GET", url) + if(response.status_code == 200): + response_data = response.json() + _LOGGER.debug(f"DLX response: {response}") + # Proceed to parsing + return self.__parse_data(response_data) def __parse_data(self, response): # Implement the logic to parse the response from the Resol device @@ -141,10 +188,14 @@ def __parse_data(self, response): iHeader = 0 for header in response["headers"]: - _LOGGER.debug(f"{self.device['serial']}: Found header[{iHeader}] now parsing it ...") + _LOGGER.debug( + f"{self.device['serial']}: Found header[{iHeader}] now parsing it ..." + ) iField = 0 for field in response["headers"][iHeader]["fields"]: - value = response["headersets"][0]["packets"][iHeader]["field_values"][iField]["raw_value"] + value = response["headersets"][0]["packets"][iHeader]["field_values"][ + iField + ]["raw_value"] if isinstance(value, float): value = round(value, 2) if "date" in field["name"]: @@ -152,30 +203,34 @@ def __parse_data(self, response): value = epochStart + datetime.timedelta(0, value) # Sensor's unique ID combination of device serial and each header/field unique name as internal sensor hash (not the entity_id) - unique_id = self.device['serial'] + "_" + header["id"] + "__" + field["id"] + unique_id = ( + self.device["serial"] + "_" + header["id"] + "__" + field["id"] + ) data[unique_id] = ResolEndPoint( internal_unique_id=unique_id, - serial=self.device['serial'], - name=self.device['serial'].lower() + "_" + field["name"].replace(" ", "_").lower(), + serial=self.device["serial"], + name=self.device["serial"].lower() + + "_" + + field["name"].replace(" ", "_").lower(), friendly_name=field["name"].replace(" ", "_").lower(), value=value, unit=field["unit"].strip(), description=header["description"], destination=header["destination_name"], - source=header["source_name"] - ) + source=header["source_name"], + ) iField += 1 - iHeader +=1 + iHeader += 1 return data - def format_serial_to_mac(self, serial: str) -> str: # Split the serial into chunks of two characters - mac_chunks = [serial[i:i+2] for i in range(0, len(serial), 2)] + mac_chunks = [serial[i : i + 2] for i in range(0, len(serial), 2)] # Join the chunks with colons - mac_address = ':'.join(mac_chunks) + mac_address = ":".join(mac_chunks) return mac_address + class AuthenticationFailed(Exception): - """Exception to indicate authentication failure.""" \ No newline at end of file + """Exception to indicate authentication failure.""" diff --git a/custom_components/resol/sensor.py b/custom_components/resol/sensor.py index 48555d6..e94f13a 100644 --- a/custom_components/resol/sensor.py +++ b/custom_components/resol/sensor.py @@ -36,19 +36,14 @@ ATTR_PRODUCT_BUILD, ATTR_PRODUCT_VERSION, ATTR_PRODUCT_FEATURES, - ISSUE_URL_ERROR_MESSAGE -) - -from .resolapi import ( - ResolAPI, - AuthenticationFailed + ISSUE_URL_ERROR_MESSAGE, ) +from .resolapi import ResolAPI, AuthenticationFailed # Setting up the adding and updating of sensor entities async def async_setup_entry(hass, config_entry, async_add_entities): - # Retrieve the API instance from the config_entry data resol_api = hass.data[DOMAIN][config_entry.entry_id] @@ -59,36 +54,48 @@ async def async_setup_entry(hass, config_entry, async_add_entities): if not device_check: # If device returns False or is empty, log an error and return - _LOGGER.warning(f"{resol_api.device['serial']}: It appears the Resol device is offline or has changed host. {data}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{resol_api.device['serial']}: It appears the Resol device is offline or has changed host. {data}" + + ISSUE_URL_ERROR_MESSAGE + ) return # Call the ResolAPI to get the device API data - data = await hass.async_add_executor_job(resol_api.fetch_data_km2) + # As requested here: https://github.com/evercape/hass-resol-KM2/issues/3 + if device_check["product"] == "KM2": + data = await hass.async_add_executor_job(resol_api.fetch_data_km2) + elif device_check["product"] == "KM1": + data = await hass.async_add_executor_job(resol_api.fetch_data_km1) + elif device_check["product"] == "DL2" or device_check["product"] == "DL3": + data = await hass.async_add_executor_job(resol_api.fetch_data_dlx) if not data: # If data returns False or is empty, log an error and return - _LOGGER.warning(f"{resol_api.device['serial']}: Failed to fetch sensor data - authentication failed or no data. {data}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{resol_api.device['serial']}: Failed to fetch sensor data - authentication failed or no data. {data}" + + ISSUE_URL_ERROR_MESSAGE + ) return # Exception if device cannot be found except IntegrationError as error: - _LOGGER.warning(f"{resol_api.device['serial']}: Failed to fetch sensor data: {error}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{resol_api.device['serial']}: Failed to fetch sensor data: {error}" + + ISSUE_URL_ERROR_MESSAGE + ) return except AuthenticationFailed as error: _LOGGER.warning(f"{resol_api.device['serial']}: Authentication failed: {error}") return - - # Get device id and then reset the device specific list of sensors for updates to ensure it's empty before adding new entries - device_id = resol_api.device['serial'] + device_id = resol_api.device["serial"] _LOGGER.debug(f"{resol_api.device['serial']}: Device ID: {device_id}") # Initialize or clear the sensor list for this device hass.data[DOMAIN]["device_specific_sensors"][device_id] = [] - - #Registering entities to registry, and adding them to list for schedule updates on each device which is stored within hass.data + # Registering entities to registry, and adding them to list for schedule updates on each device which is stored within hass.data for unique_id, endpoint in data.items(): # Get individul sensor entry from API sensor = ResolSensor(resol_api, endpoint) @@ -99,40 +106,46 @@ async def async_setup_entry(hass, config_entry, async_add_entities): # Register sensor async_add_entities([sensor], False) - device_specific_sensors = hass.data[DOMAIN]["device_specific_sensors"] - _LOGGER.debug(f"{resol_api.device['serial']}: List of device_specific_sensors[device_id]: {device_specific_sensors[device_id]}") + _LOGGER.debug( + f"{resol_api.device['serial']}: List of device_specific_sensors[device_id]: {device_specific_sensors[device_id]}" + ) # Log the number of sensors registered (and added to the update list) - _LOGGER.info(f"{resol_api.device['serial']}: All '{len(device_specific_sensors[device_id])}' sensors have registered.") - - - + _LOGGER.info( + f"{resol_api.device['serial']}: All '{len(device_specific_sensors[device_id])}' sensors have registered." + ) # Schedule updates async def async_update_data(now): # If device deleted but HASS not restarted, then don't bother continuing - if device_id not in hass.data.get(DOMAIN, {}).get("device_specific_sensors", {}): + if device_id not in hass.data.get(DOMAIN, {}).get( + "device_specific_sensors", {} + ): return False - _LOGGER.debug(f"{resol_api.device['serial']}: Preparing to update sensors at {now}") + _LOGGER.debug( + f"{resol_api.device['serial']}: Preparing to update sensors at {now}" + ) # Fetch the full dataset once from the API try: full_data = await hass.async_add_executor_job(resol_api.fetch_data_km2) except Exception as e: - _LOGGER.error(f"{resol_api.device['serial']}: Error fetching data from the device: {e}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error( + f"{resol_api.device['serial']}: Error fetching data from the device: {e}" + + ISSUE_URL_ERROR_MESSAGE + ) return # Fetch the registry and check if sensors are enabled registry = entity_registry.async_get(hass) - # Set counters to zero - counter_updated = 0 # Successfully updated sensors - counter_disabled = 0 # Disabled sensors, not to be updated - counter_unchanged = 0 # Skipped sensors since value has not changed - counter_error = 0 # Skipped sensors due to some error, such as registry not found or no data from API + counter_updated = 0 # Successfully updated sensors + counter_disabled = 0 # Disabled sensors, not to be updated + counter_unchanged = 0 # Skipped sensors since value has not changed + counter_error = 0 # Skipped sensors due to some error, such as registry not found or no data from API # Get the list of defice specific sensors from hass.data if device_id in hass.data.get(DOMAIN, {}).get("device_specific_sensors", {}): @@ -140,58 +153,80 @@ async def async_update_data(now): # Now loop through the sensors to be updated for sensor in device_specific_sensors[device_id]: - entity_id = registry.async_get_entity_id('sensor', DOMAIN, sensor.unique_id) + entity_id = registry.async_get_entity_id( + "sensor", DOMAIN, sensor.unique_id + ) if entity_id: entity = registry.entities.get(entity_id) if entity and not entity.disabled_by: sensor_data = full_data.get(sensor.unique_id) - _LOGGER.debug(f"{resol_api.device['serial']}: Sensor '{sensor.name}' is not disabled.") + _LOGGER.debug( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' is not disabled." + ) if sensor_data: - _LOGGER.debug(f"{resol_api.device['serial']}: Sensor '{sensor.name}' has API data eligible for update {sensor_data}") + _LOGGER.debug( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' has API data eligible for update {sensor_data}" + ) # Check if current state value differs from new API value, or current state has not initialized - if str(sensor._state).strip() != str(sensor_data.value).strip(): - _LOGGER.debug(f"{resol_api.device['serial']}: Sensor '{sensor.name}' marked for update as current value '{sensor._state}' is not the same as new value '{sensor_data.value}'") + if ( + str(sensor._state).strip() + != str(sensor_data.value).strip() + ): + _LOGGER.debug( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' marked for update as current value '{sensor._state}' is not the same as new value '{sensor_data.value}'" + ) # Now update the sensor with new values - update_status = await sensor.async_update(sensor_data) #update_status returns 1 for upated, 0 for skipped or error + update_status = await sensor.async_update( + sensor_data + ) # update_status returns 1 for upated, 0 for skipped or error counter_updated = counter_updated + update_status else: - _LOGGER.debug(f"{resol_api.device['serial']}: Sensor '{sensor.name}' skipped as current value '{sensor._state}' same as new value '{sensor_data.value}'") + _LOGGER.debug( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' skipped as current value '{sensor._state}' same as new value '{sensor_data.value}'" + ) counter_unchanged = counter_unchanged + 1 else: - _LOGGER.warning(f"{resol_api.device['serial']}: No update data found for sensor '{sensor.name}'"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{resol_api.device['serial']}: No update data found for sensor '{sensor.name}'" + + ISSUE_URL_ERROR_MESSAGE + ) counter_error = counter_error + 1 else: - _LOGGER.debug(f"{resol_api.device['serial']}: Sensor '{sensor.name}' is disabled, skipping update") + _LOGGER.debug( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' is disabled, skipping update" + ) counter_disabled = counter_disabled + 1 else: - _LOGGER.warning(f"{resol_api.device['serial']}: Sensor '{sensor.name}' not found in the registry, skipping update"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{resol_api.device['serial']}: Sensor '{sensor.name}' not found in the registry, skipping update" + + ISSUE_URL_ERROR_MESSAGE + ) counter_error = counter_error + 1 - # Log summary of updates - _LOGGER.info(f"{resol_api.device['serial']}: A total of '{counter_updated}' sensors have updated, '{counter_disabled}' are disabled and skipped update, '{counter_unchanged}' sensors value remained constant and '{counter_error}' sensors occured any errors.") + _LOGGER.info( + f"{resol_api.device['serial']}: A total of '{counter_updated}' sensors have updated, '{counter_disabled}' are disabled and skipped update, '{counter_unchanged}' sensors value remained constant and '{counter_error}' sensors occured any errors." + ) # Device not in list: must have been deleted, will resolve post re-start else: - _LOGGER.warning(f"{resol_api.device['serial']}: Sensor must have been deleted, re-start of HA recommended.") - + _LOGGER.warning( + f"{resol_api.device['serial']}: Sensor must have been deleted, re-start of HA recommended." + ) # Get the polling interval from the options, defaulting to 60 seconds if not set - polling_interval = timedelta(seconds=config_entry.options.get('polling_interval', 60)) + polling_interval = timedelta( + seconds=config_entry.options.get("polling_interval", 60) + ) async_track_time_interval(hass, async_update_data, polling_interval) - - - - -#This is the actual instance of SensorEntity class +# This is the actual instance of SensorEntity class class ResolSensor(SensorEntity): """Representation of a RESOL Temperature Sensor.""" def __init__(self, resol_api: ResolAPI, endpoint): - """Initialize the sensor.""" # Make the ResolAPI and the endpoint parameters from the Sensor API available self.resol_api = resol_api @@ -208,7 +243,9 @@ def __init__(self, resol_api: ResolAPI, endpoint): self._unique_id = endpoint.internal_unique_id # Set the icon for the sensor based on its unit, ensure the icon_mapper is defined - self._icon = ResolSensor.icon_mapper.get(endpoint.unit) # Default handled in function + self._icon = ResolSensor.icon_mapper.get( + endpoint.unit + ) # Default handled in function # The initial state/value of the sensor self._state = endpoint.value @@ -224,10 +261,9 @@ def __init__(self, resol_api: ResolAPI, endpoint): if resol_api.options.get("disable_sensors") and not endpoint.unit: self._attr_entity_registry_enabled_default = False - @property def should_poll(self): - """ async_track_time_intervals handles updates. """ + """async_track_time_intervals handles updates.""" return False @property @@ -240,7 +276,6 @@ def name(self): """Return the name of the sensor.""" return self._name - @property def state(self): """Return the state of the sensor.""" @@ -253,43 +288,43 @@ def unit_of_measurement(self): @property def device_class(self): - """ Return the device class of this entity, if any. """ - if self._unit == '°C': + """Return the device class of this entity, if any.""" + if self._unit == "°C": return SensorDeviceClass.TEMPERATURE - elif self._unit == '%': + elif self._unit == "%": return SensorDeviceClass.POWER_FACTOR - elif self._unit == 'Wh': + elif self._unit == "Wh": return SensorDeviceClass.ENERGY else: return None @property def state_class(self): - """ Return the state class of this entity, if any. """ - if self._unit == '°C': + """Return the state class of this entity, if any.""" + if self._unit == "°C": return SensorStateClass.MEASUREMENT - elif self._unit == 'h': + elif self._unit == "h": return SensorStateClass.MEASUREMENT - elif self._unit == 'Wh': + elif self._unit == "Wh": return SensorStateClass.TOTAL_INCREASING else: return None @property def extra_state_attributes(self): - """ Return the state attributes of this device. """ + """Return the state attributes of this device.""" attr = {} attr[ATTR_PRODUCT_DESCRIPTION] = self.endpoint.description attr[ATTR_DESTINATION_NAME] = self.endpoint.destination attr[ATTR_SOURCE_NAME] = self.endpoint.source attr[ATTR_UNIQUE_ID] = self.endpoint.internal_unique_id - attr[ATTR_PRODUCT_VENDOR] = self.resol_api.device['vendor'] - attr[ATTR_PRODUCT_NAME] = self.resol_api.device['name'] + attr[ATTR_PRODUCT_VENDOR] = self.resol_api.device["vendor"] + attr[ATTR_PRODUCT_NAME] = self.resol_api.device["name"] attr[ATTR_PRODUCT_SERIAL] = self.endpoint.serial - attr[ATTR_PRODUCT_VERSION] = self.resol_api.device['version'] - attr[ATTR_PRODUCT_BUILD] = self.resol_api.device['build'] - attr[ATTR_PRODUCT_FEATURES] = self.resol_api.device['features'] + attr[ATTR_PRODUCT_VERSION] = self.resol_api.device["version"] + attr[ATTR_PRODUCT_BUILD] = self.resol_api.device["build"] + attr[ATTR_PRODUCT_FEATURES] = self.resol_api.device["features"] return attr @@ -298,35 +333,38 @@ def device_info(self): """Return device specific attributes.""" # Device unique identifier is the serial return { - 'identifiers': {(DOMAIN, self.resol_api.device['serial'])}, - 'name': self.resol_api.device['name'], - 'manufacturer': 'RESOL', - + "identifiers": {(DOMAIN, self.resol_api.device["serial"])}, + "name": self.resol_api.device["name"], + "manufacturer": "RESOL", } - icon_mapper = defaultdict(lambda: "mdi:alert-circle", { - '°C': 'mdi:thermometer', - '%': 'mdi:flash', - 'l/h': 'mdi:hydro-power', - 'bar': 'mdi:car-brake-low-pressure', - '%RH': 'mdi:water-percent', - 's': 'mdi:timer', - 'Wh': 'mdi:solar-power-variant-outline', - 'h': 'mdi:timer-sand' - }) - + icon_mapper = defaultdict( + lambda: "mdi:alert-circle", + { + "°C": "mdi:thermometer", + "%": "mdi:flash", + "l/h": "mdi:hydro-power", + "bar": "mdi:car-brake-low-pressure", + "%RH": "mdi:water-percent", + "s": "mdi:timer", + "Wh": "mdi:solar-power-variant-outline", + "h": "mdi:timer-sand", + }, + ) # This is to register the icon settings async def async_added_to_hass(self): """Call when the sensor is added to Home Assistant.""" self.async_write_ha_state() - # Update of Sensor values async def async_update(self, sensor_data=None): """Update the sensor with the provided data.""" if sensor_data is None: - _LOGGER.warning(f"{self.resol_api.device['serial']}: No new data provided for sensor '{self.name}' update"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.warning( + f"{self.resol_api.device['serial']}: No new data provided for sensor '{self.name}' update" + + ISSUE_URL_ERROR_MESSAGE + ) update_status = 0 return @@ -336,9 +374,10 @@ async def async_update(self, sensor_data=None): self.async_write_ha_state() except Exception as error: - _LOGGER.error(f"{self.resol_api.device['serial']}: Error updating sensor {self.name}: {error}"+ISSUE_URL_ERROR_MESSAGE) + _LOGGER.error( + f"{self.resol_api.device['serial']}: Error updating sensor {self.name}: {error}" + + ISSUE_URL_ERROR_MESSAGE + ) update_status = 0 return update_status - -