diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index d46d3cb..c35cfce 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -236,3 +236,44 @@ def __repr__(self) -> str: return f"{concatenated!r}" return f"{concatenated!r} ({comparable})" + + +class FlowControlSerialProtocol(zigpy.serial.SerialProtocol): + def _set_signals( + self, + *, + rts: bool | None = None, + cts: bool | None = None, + dtr: bool | None = None, + ) -> None: + if rts is not None: + self._transport.serial.rts = rts + + if cts is not None: + self._transport.serial.cts = cts + + if dtr is not None: + self._transport.serial.dtr = dtr + + async def set_signals( + self, + *, + rts: bool | None = None, + cts: bool | None = None, + dtr: bool | None = None, + ) -> None: + _LOGGER.debug( + "Setting UART signals: rts=%s, cts=%s, dtr=%s", + int(rts) if rts is not None else "-", + int(cts) if cts is not None else "-", + int(dtr) if dtr is not None else "-", + ) + + if hasattr(self._transport, "set_signals"): + await self._transport.set_signals(rts=rts, cts=cts, dtr=dtr) + return + + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, lambda: self._set_signals(rts=rts, cts=cts, dtr=dtr) + ) diff --git a/universal_silabs_flasher/const.py b/universal_silabs_flasher/const.py index 28263a1..32e8e0e 100644 --- a/universal_silabs_flasher/const.py +++ b/universal_silabs_flasher/const.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import dataclasses import enum @@ -52,32 +55,62 @@ class ResetTarget(enum.Enum): YELLOW = "yellow" IHOST = "ihost" SLZB07 = "slzb07" - SONOFF = "sonoff" + RTS_DTR = "rts_dtr" + + +@dataclasses.dataclass +class GpioPattern: + pins: dict[str | int, bool] + delay_after: float + + +@dataclasses.dataclass +class GpioResetConfig: + chip: str | None + chip_type: str | None + pattern: list[GpioPattern] +# fmt: off GPIO_CONFIGS = { - ResetTarget.YELLOW: { - "chip": "/dev/gpiochip0", - "pin_states": { - 24: [True, False, False, True], - 25: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, - ResetTarget.IHOST: { - "chip": "/dev/gpiochip1", - "pin_states": { - 27: [True, False, False, True], - 26: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, - ResetTarget.SLZB07: { - "chip_name": "cp210x", - "pin_states": { - 5: [True, False, False, True], - 4: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, + ResetTarget.YELLOW: GpioResetConfig( + chip="/dev/gpiochip0", + chip_type=None, + pattern=[ + GpioPattern(pins={24: True, 25: True}, delay_after=0.1), + GpioPattern(pins={24: False, 25: False}, delay_after=0.1), + GpioPattern(pins={24: False, 25: True}, delay_after=0.1), + GpioPattern(pins={24: True, 25: True}, delay_after=0.0), + ], + ), + ResetTarget.IHOST: GpioResetConfig( + chip="/dev/gpiochip1", + chip_type=None, + pattern=[ + GpioPattern(pins={26: True, 27: True}, delay_after=0.1), + GpioPattern(pins={26: False, 27: False}, delay_after=0.1), + GpioPattern(pins={26: True, 27: False}, delay_after=0.1), + GpioPattern(pins={26: True, 27: True}, delay_after=0.0), + ] + ), + ResetTarget.SLZB07: GpioResetConfig( + chip=None, + chip_type="cp210x", + pattern=[ + GpioPattern(pins={4: True, 5: True}, delay_after=0.1), + GpioPattern(pins={4: False, 5: False}, delay_after=0.1), + GpioPattern(pins={4: True, 5: False}, delay_after=0.1), + GpioPattern(pins={4: True, 5: True}, delay_after=0.0), + ] + ), + ResetTarget.RTS_DTR: GpioResetConfig( + chip=None, + chip_type="uart", + pattern=[ + GpioPattern(pins={"dtr": False, "rts": True}, delay_after=0.1), + GpioPattern(pins={"dtr": True, "rts": False}, delay_after=0.5), + GpioPattern(pins={"dtr": False, "rts": False}, delay_after=0.0), + ] + ), } +# fmt: on diff --git a/universal_silabs_flasher/flash.py b/universal_silabs_flasher/flash.py index 290ae26..d9e88e7 100644 --- a/universal_silabs_flasher/flash.py +++ b/universal_silabs_flasher/flash.py @@ -144,7 +144,7 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context ) @click.option( "--bootloader-reset", - type=click.Choice([t.value for t in ResetTarget]), + type=click.Choice([t.value for t in ResetTarget] + ["sonoff"]), ) @click.pass_context def main( @@ -189,6 +189,13 @@ def main( param = next(p for p in ctx.command.params if p.name == "device") raise click.MissingParameter(ctx=ctx, param=param) + if bootloader_reset == "sonoff": + _LOGGER.warning( + "The 'sonoff' reset target is deprecated." + " Use '--bootloader-reset rts_dtr' instead." + ) + bootloader_reset = ResetTarget.RTS_DTR.value + ctx.obj = { "verbosity": verbose, "flasher": Flasher( @@ -339,7 +346,7 @@ async def flash( flasher._reset_target = ResetTarget.YELLOW _LOGGER.info(reset_msg, "--yellow-gpio-reset") elif sonoff_reset: - flasher._reset_target = ResetTarget.SONOFF + flasher._reset_target = ResetTarget.RTS_DTR _LOGGER.info(reset_msg, "--sonoff-reset") try: diff --git a/universal_silabs_flasher/flasher.py b/universal_silabs_flasher/flasher.py index b3e7e2d..013892d 100644 --- a/universal_silabs_flasher/flasher.py +++ b/universal_silabs_flasher/flasher.py @@ -8,11 +8,11 @@ import bellows.config import bellows.ezsp import bellows.types -from zigpy.serial import SerialProtocol import zigpy.types from .common import ( PROBE_TIMEOUT, + FlowControlSerialProtocol, Version, asyncio_timeout, connect_protocol, @@ -68,33 +68,33 @@ def __init__( ResetTarget(bootloader_reset) if bootloader_reset else None ) - async def enter_bootloader_reset(self, target): + async def enter_bootloader_reset(self, target: ResetTarget) -> None: _LOGGER.info(f"Triggering {target.value} bootloader") - if target in GPIO_CONFIGS.keys(): - config = GPIO_CONFIGS[target] - if "chip" not in config.keys(): - _LOGGER.warning( - f"When using {target.value} bootloader reset " - + "ensure no other CP2102 USB serial devices are connected." - ) - config["chip"] = await find_gpiochip_by_label(config["chip_name"]) - await send_gpio_pattern( - config["chip"], config["pin_states"], config["toggle_delay"] + + config = GPIO_CONFIGS[target] + chip = config.chip + + if config.chip_type == "cp210x": + _LOGGER.warning( + "When using %s bootloader reset ensure no other CP2102 USB serial" + " devices are connected.", + target.value, ) + + chip = await find_gpiochip_by_label(config.chip_type) + + if config.chip_type == "uart": + # The baudrate isn't really necessary, since we're using flow control pins + baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0] + + async with connect_protocol( + self._device, baudrate, FlowControlSerialProtocol + ) as uart: + for pattern in config.pattern: + await uart.set_signals(**pattern.pins) + await asyncio.sleep(pattern.delay_after) else: - await self.enter_serial_bootloader() - - async def enter_serial_bootloader(self): - baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0] - async with connect_protocol(self._device, baudrate, SerialProtocol) as sonoff: - serial = sonoff._transport.serial - serial.dtr = False - serial.rts = True - await asyncio.sleep(0.1) - serial.dtr = True - serial.rts = False - await asyncio.sleep(0.5) - serial.dtr = False + await send_gpio_pattern(chip, config.pattern) def _connect_gecko_bootloader(self, baudrate: int): return connect_protocol(self._device, baudrate, GeckoBootloaderProtocol) diff --git a/universal_silabs_flasher/gpio.py b/universal_silabs_flasher/gpio.py index f603e92..e95c890 100644 --- a/universal_silabs_flasher/gpio.py +++ b/universal_silabs_flasher/gpio.py @@ -1,10 +1,15 @@ from __future__ import annotations import asyncio +import logging from os import scandir import time import typing +from .const import GpioPattern + +_LOGGER = logging.getLogger(__name__) + try: import gpiod @@ -14,20 +19,14 @@ if gpiod is None: # No gpiod library - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: raise NotImplementedError("GPIO not supported on this platform") elif is_gpiod_v1: # gpiod <= 1.5.4 - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: - num_states = len(next(iter(pin_states.values()))) - + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: chip = gpiod.chip(chip, gpiod.chip.OPEN_BY_PATH) - lines = chip.get_lines(pin_states.keys()) + lines = chip.get_lines(pattern[0].pins.keys()) config = gpiod.line_request() config.consumer = "universal-silabs-flasher" @@ -35,12 +34,15 @@ def _send_gpio_pattern( try: # Open the pins and set their initial states - lines.request(config, [int(states[0]) for states in pin_states.values()]) + _LOGGER.debug("Sending GPIO pattern %r", pattern[0]) + lines.request(config, [int(v) for v in pattern[0].pins.values()]) + time.sleep(pattern[0].delay_after) # Send all subsequent states - for i in range(1, num_states): - time.sleep(toggle_delay) - lines.set_values([int(states[i]) for states in pin_states.values()]) + for p in pattern[1:]: + _LOGGER.debug("Sending GPIO pattern %r", p) + lines.set_values([int(v) for v in p.pins.values()]) + time.sleep(p.delay_after) finally: # Clean up and ensure the GPIO pins are reset to inputs lines.set_direction_input() @@ -48,11 +50,8 @@ def _send_gpio_pattern( else: # gpiod >= 2.0.2 - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: - # `gpiod` isn't available on Windows - num_states = len(next(iter(pin_states.values()))) + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: + _LOGGER.debug("Sending GPIO pattern %r", pattern[0]) with gpiod.request_lines( path=chip, @@ -61,27 +60,30 @@ def _send_gpio_pattern( # Set initial states pin: gpiod.LineSettings( direction=gpiod.line.Direction.OUTPUT, - output_value=gpiod.line.Value(states[0]), + output_value=gpiod.line.Value(state), ) - for pin, states in pin_states.items() + for pin, state in pattern[0].pins.items() }, ) as request: + time.sleep(pattern[0].delay_after) + try: # Send all subsequent states - for i in range(1, num_states): - time.sleep(toggle_delay) + for p in pattern[1:]: + _LOGGER.debug("Sending GPIO pattern %r", p) request.set_values( { - pin: gpiod.line.Value(int(pin_states[pin][i])) - for pin, states in pin_states.items() + pin: gpiod.line.Value(int(state)) + for pin, state in p.pins.items() } ) + time.sleep(p.delay_after) finally: # Clean up and ensure the GPIO pins are reset to inputs request.reconfigure_lines( { pin: gpiod.LineSettings(direction=gpiod.line.Direction.INPUT) - for pin, states in pin_states.items() + for pin in pattern[0].pins.keys() } ) @@ -119,9 +121,7 @@ async def find_gpiochip_by_label(label: str) -> str: return result -async def send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float -) -> None: +async def send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: await asyncio.get_running_loop().run_in_executor( - None, _send_gpio_pattern, chip, pin_states, toggle_delay + None, _send_gpio_pattern, chip, pattern )