Skip to content

Commit

Permalink
feat: Migrate to use Datacordinator to attempt to resolve Bluetooth
Browse files Browse the repository at this point in the history
async issues
  • Loading branch information
sopelj committed Jan 15, 2022
1 parent 4c008c8 commit 6aa78af
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 203 deletions.
116 changes: 116 additions & 0 deletions custom_components/ember_mug/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,127 @@
"""Ember Mug Custom Integration."""
import asyncio
import logging
from typing import cast

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_MAC,
CONF_NAME,
CONF_TEMPERATURE_UNIT,
TEMP_CELSIUS,
TEMP_FAHRENHEIT,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
import homeassistant.util.dt as dt_util

from .const import DOMAIN
from .mug import EmberMug

PLATFORMS = [Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)


class MugDataUpdateCoordinator(DataUpdateCoordinator):
"""Shared Data Coordinator for polling mug updates."""

def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init data coordinator and start mug running."""
super().__init__(
hass, _LOGGER, name=f"ember-mug-{config.entry_id}", update_interval=None
)
self.mac_address = config.data[CONF_MAC]
self.name = config.data.get.get(CONF_NAME, f"Ember Mug {self.mac_address}")
self.unit_of_measurement = config.data.get(CONF_TEMPERATURE_UNIT, TEMP_CELSIUS)

self.mug = EmberMug(
self.mac_address, self.unit_of_measurement != TEMP_FAHRENHEIT
)
_LOGGER.info(f"Ember Mug {self.name} Setup")
# Start loop
_LOGGER.info(f"Start running {self.name}")
self.hass.async_create_task(self._run())
# Default Data
self.data = {
"serial_number": None,
"mug_id": None,
"last_read_time": None,
"firmware_info": None,
"model": "Ember Mug",
"mug_name": self.name,
}

async def _run(self):
"""Start the task loop."""
try:
self._loop = True
_LOGGER.info(f"Starting mug loop {self.mac_address}")
while self._loop:
await self.mug.ensure_connected()
await self.mug.update_all()
self.mug.updates_queued.clear()
await self.async_refresh()

# Maintain connection for 5min seconds until next update
# We will be notified of most changes during this time
for _ in range(150):
await self.mug.ensure_connected()
await self.mug.update_queued_attributes()
await asyncio.sleep(2)

except Exception as e:
_LOGGER.error(f"An unexpected error occurred during loop {e}. Restarting.")
if self.mug.is_connected:
await self.mug.disconnect()
self.hass.async_create_task(self._run())

async def _async_update_data(self):
"""Update the data of the coordinator."""
self.data = {
"mug_id": self.mug.mug_id,
"serial_number": self.mug.serial_number,
"last_read_time": dt_util.utcnow(),
"firmware_info": self.mug.firmware_info,
"mug_name": self.name,
"model": self.mug.model,
}
_LOGGER.debug(f"{self.data}")

@property
def device_info(self) -> DeviceInfo:
"""Return information about the mug."""
unique_id = cast(str, self.config_entry.unique_id)
return DeviceInfo(
identifiers={(DOMAIN, unique_id)},
name=self.data["name"],
model=self.data["model"],
sw_version=self.data["firmware_info"],
manufacturer="Ember",
)


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Mug Platform."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
coordinator = MugDataUpdateCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = {"coordinator": coordinator}
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True


async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok


async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"""Register service calls."""
return True
2 changes: 1 addition & 1 deletion custom_components/ember_mug/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"documentation": "https://github.com/sopelj/hass_ember_mug",
"domain": "ember_mug",
"name": "Ember Mug",
"version": "0.1.2",
"version": "0.2.1",
"requirements": ["bleak==0.14.1"]
}
67 changes: 20 additions & 47 deletions custom_components/ember_mug/mug.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import base64
import contextlib
import re
from typing import Callable, Tuple, Union
from typing import Tuple, Union

from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
from homeassistant.const import TEMP_CELSIUS, TEMP_FAHRENHEIT
from homeassistant.helpers.typing import HomeAssistantType

from . import _LOGGER
from .const import (
Expand Down Expand Up @@ -76,24 +75,15 @@ async def find_mug() -> dict[str, str]:
class EmberMug:
"""Class to connect and communicate with the mug via Bluetooth."""

def __init__(
self,
mac_address: str,
use_metric: bool,
async_update_callback: Callable,
hass: HomeAssistantType,
) -> None:
def __init__(self, mac_address: str, use_metric: bool) -> None:
"""Set default values in for mug attributes."""
self._loop = False
self._first_run = True
self.hass = hass
self.async_update_callback = async_update_callback
self.mac_address = mac_address
self.client = BleakClient(mac_address)
self.client = BleakClient(mac_address, address_type="random")
self.available = True
self.updates_queued = set()
self.use_metric = use_metric

self.model = "Ember Ceramic Mug"
self.led_colour_rgba = [255, 255, 255, 255]
self.latest_event_id: int = None
self.liquid_level: float = None
Expand All @@ -115,6 +105,11 @@ def __init__(
# if len(2) -> Charge Time
self.battery_voltage = None

@property
def is_connected(self) -> bool:
"""Pass is connected to mug class."""
return self.client.is_connected

@property
def colour(self) -> str:
"""Return colour as hex value."""
Expand All @@ -126,32 +121,6 @@ def liquid_state_label(self) -> str:
"""Return human-readable liquid state."""
return LIQUID_STATE_LABELS[self.liquid_state or 0]

async def async_run(self) -> None:
"""Start the task loop."""
try:
self._loop = True
_LOGGER.info(f"Starting mug loop {self.mac_address}")
await self.connect()

while self._loop:
if not self.client.is_connected:
await self.connect()

await self.update_all()
self.updates_queued.clear()
self.async_update_callback()

# Maintain connection for 5min seconds until next update
# We will be notified of most changes during this time
for _ in range(150):
self.client.is_connected
await self.update_queued_attributes()
await asyncio.sleep(2)

except Exception as e:
_LOGGER.error(f"An unexpected error occurred during loop {e}. Restarting.")
self.hass.async_create_task(self.async_run())

async def _temp_from_bytes(self, temp_bytes: bytearray) -> float:
"""Get temperature from bytearray and convert to fahrenheit if needed."""
temp = float(bytes_to_little_int(temp_bytes)) * 0.01
Expand Down Expand Up @@ -262,7 +231,7 @@ async def update_firmware_info(self) -> None:
# string getIntValue(18, 4) -> Bootloader
self.firmware_info = str(await self.client.read_gatt_char(UUID_OTA))

async def connect(self) -> None:
async def connect(self) -> bool:
"""Try 10 times to connect and if we fail wait five minutes and try again. If connected also subscribe to state notifications."""
connected = False
for i in range(1, 10 + 1):
Expand All @@ -274,11 +243,10 @@ async def connect(self) -> None:
break
except BleakError as e:
_LOGGER.error(f"Init: {e} on attempt {i}. waiting 30sec")
asyncio.sleep(30)
await asyncio.sleep(30)

if connected is False:
self.available = False
self.async_update_callback()
_LOGGER.warning(
f"Failed to connect to {self.mac_address} after 10 tries. Will try again in 2min"
)
Expand All @@ -300,17 +268,18 @@ async def connect(self) -> None:
await self.client.start_notify(UUID_PUSH_EVENT, self.push_notify)
except Exception as e:
_LOGGER.warning(f"Failed to subscribe to state attr {e}")
return connected

async def update_queued_attributes(self) -> None:
async def update_queued_attributes(self) -> bool:
"""Update all attributes in queue."""
if not self.updates_queued:
return
return False
_LOGGER.debug(f"Queued updates {self.updates_queued}")
queued_attributes = set(self.updates_queued)
self.updates_queued.clear()
for attr in queued_attributes:
await getattr(self, f"update_{attr}")()
self.async_update_callback()
return True

def push_notify(self, sender: int, data: bytearray):
"""Push events from the mug to indicate changes."""
Expand Down Expand Up @@ -372,12 +341,16 @@ async def update_all(self) -> bool:
success = False
return success

async def ensure_connected(self):
"""Ensure connected."""
if not self.is_connected:
await self.connect()

async def disconnect(self) -> None:
"""Stop Loop and disconnect."""
with contextlib.suppress(BleakError):
await self.client.stop_notify(UUID_PUSH_EVENT)

self._loop = False
with contextlib.suppress(BleakError):
if self.client and self.client.is_connected:
await self.client.disconnect()
Loading

0 comments on commit 6aa78af

Please sign in to comment.