From b8266ee5efcb039c1bd029d06ae1e197e2090262 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 11 Jan 2025 12:03:20 -0500 Subject: [PATCH 1/7] Expose flow control asynchronously --- universal_silabs_flasher/common.py | 30 +++++++++++++++++++++++++++++ universal_silabs_flasher/flasher.py | 15 +++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index d46d3cb..2852175 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -236,3 +236,33 @@ def __repr__(self) -> str: return f"{concatenated!r}" return f"{concatenated!r} ({comparable})" + + +class FlowControlSerialProtocol(zigpy.serial.SerialProtocol): + def _set_flow_control( + self, + *, + rts: bool | None = None, + cts: bool | None = None, + dtr: bool | None = None, + ) -> None: + if rts is not None: + self.transport.serial.rts = rts + + if cts is not None: + self.transport.serial.cts = cts + + if dtr is not None: + self.transport.serial.dtr = dtr + + async def set_flow_control( + self, + *, + rts: bool | None = None, + cts: bool | None = None, + dtr: bool | None = None, + ) -> None: + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, lambda: self._set_flow_control(rts=rts, cts=cts, dtr=dtr) + ) diff --git a/universal_silabs_flasher/flasher.py b/universal_silabs_flasher/flasher.py index b3e7e2d..19839d2 100644 --- a/universal_silabs_flasher/flasher.py +++ b/universal_silabs_flasher/flasher.py @@ -8,7 +8,7 @@ import bellows.config import bellows.ezsp import bellows.types -from zigpy.serial import SerialProtocol +from zigpy.serial import FlowControlSerialProtocol import zigpy.types from .common import ( @@ -86,15 +86,14 @@ async def enter_bootloader_reset(self, target): async def enter_serial_bootloader(self): baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0] - async with connect_protocol(self._device, baudrate, SerialProtocol) as sonoff: - serial = sonoff._transport.serial - serial.dtr = False - serial.rts = True + async with connect_protocol( + self._device, baudrate, FlowControlSerialProtocol + ) as sonoff: + await sonoff.set_flow_control(dtr=False, rts=True) await asyncio.sleep(0.1) - serial.dtr = True - serial.rts = False + await sonoff.set_flow_control(dtr=True, rts=False) await asyncio.sleep(0.5) - serial.dtr = False + await sonoff.set_flow_control(dtr=False, rts=False) def _connect_gecko_bootloader(self, baudrate: int): return connect_protocol(self._device, baudrate, GeckoBootloaderProtocol) From f9c00327ea28036460ebfdb8f65bbc785dec8da2 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 11 Jan 2025 12:04:24 -0500 Subject: [PATCH 2/7] Use the correct import --- universal_silabs_flasher/flasher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/universal_silabs_flasher/flasher.py b/universal_silabs_flasher/flasher.py index 19839d2..3d8b7e3 100644 --- a/universal_silabs_flasher/flasher.py +++ b/universal_silabs_flasher/flasher.py @@ -8,11 +8,11 @@ import bellows.config import bellows.ezsp import bellows.types -from zigpy.serial import FlowControlSerialProtocol import zigpy.types from .common import ( PROBE_TIMEOUT, + FlowControlSerialProtocol, Version, asyncio_timeout, connect_protocol, From 740eb522d0bc87a5de158d8f450398ae55694e29 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:09:27 -0500 Subject: [PATCH 3/7] Use a proper method on the `transport` object --- universal_silabs_flasher/common.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index 2852175..7714d5e 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -262,6 +262,10 @@ async def set_flow_control( cts: bool | None = None, dtr: bool | None = None, ) -> None: + if hasattr(self.transport, "set_flow_control"): + await self.transport.set_flow_control(rts=rts, cts=cts, dtr=dtr) + return + loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: self._set_flow_control(rts=rts, cts=cts, dtr=dtr) From 06d5548d0e06e7f60d992b7a49d47969eaffef54 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:44:57 -0500 Subject: [PATCH 4/7] Merge GPIO and UART reset patterns --- universal_silabs_flasher/common.py | 10 ++-- universal_silabs_flasher/const.py | 84 ++++++++++++++++++++--------- universal_silabs_flasher/flash.py | 4 +- universal_silabs_flasher/flasher.py | 47 ++++++++-------- universal_silabs_flasher/gpio.py | 51 ++++++++---------- 5 files changed, 113 insertions(+), 83 deletions(-) diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index 7714d5e..703a824 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -239,7 +239,7 @@ def __repr__(self) -> str: class FlowControlSerialProtocol(zigpy.serial.SerialProtocol): - def _set_flow_control( + def _set_signals( self, *, rts: bool | None = None, @@ -255,18 +255,18 @@ def _set_flow_control( if dtr is not None: self.transport.serial.dtr = dtr - async def set_flow_control( + async def set_signals( self, *, rts: bool | None = None, cts: bool | None = None, dtr: bool | None = None, ) -> None: - if hasattr(self.transport, "set_flow_control"): - await self.transport.set_flow_control(rts=rts, cts=cts, dtr=dtr) + if hasattr(self.transport, "set_signals"): + await self.transport.set_signals(rts=rts, cts=cts, dtr=dtr) return loop = asyncio.get_running_loop() await loop.run_in_executor( - None, lambda: self._set_flow_control(rts=rts, cts=cts, dtr=dtr) + None, lambda: self._set_signals(rts=rts, cts=cts, dtr=dtr) ) diff --git a/universal_silabs_flasher/const.py b/universal_silabs_flasher/const.py index 28263a1..6fa4df3 100644 --- a/universal_silabs_flasher/const.py +++ b/universal_silabs_flasher/const.py @@ -1,3 +1,4 @@ +import dataclasses import enum @@ -52,32 +53,67 @@ class ResetTarget(enum.Enum): YELLOW = "yellow" IHOST = "ihost" SLZB07 = "slzb07" + RTS_DTR = "rts_dtr" + + # Deprecated alias for `RTS_DTR` SONOFF = "sonoff" +@dataclasses.dataclass +class GpioPattern: + pins: dict[str | int, bool] + delay_after: float + + +@dataclasses.dataclass +class GpioResetConfig: + chip: str | None + chip_type: str | None + pattern: list[GpioPattern] + + +# fmt: off GPIO_CONFIGS = { - ResetTarget.YELLOW: { - "chip": "/dev/gpiochip0", - "pin_states": { - 24: [True, False, False, True], - 25: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, - ResetTarget.IHOST: { - "chip": "/dev/gpiochip1", - "pin_states": { - 27: [True, False, False, True], - 26: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, - ResetTarget.SLZB07: { - "chip_name": "cp210x", - "pin_states": { - 5: [True, False, False, True], - 4: [True, False, True, True], - }, - "toggle_delay": 0.1, - }, + ResetTarget.YELLOW: GpioResetConfig( + chip="/dev/gpiochip0", + chip_type=None, + pattern=[ + GpioPattern(pins={24: True, 25: True}, delay_after=0.1), + GpioPattern(pins={24: False, 25: False}, delay_after=0.1), + GpioPattern(pins={24: False, 25: True}, delay_after=0.1), + GpioPattern(pins={24: True, 25: True}, delay_after=0.0), + ], + ), + ResetTarget.IHOST: GpioResetConfig( + chip="/dev/gpiochip1", + chip_type=None, + pattern=[ + GpioPattern(pins={26: True, 27: True}, delay_after=0.1), + GpioPattern(pins={26: False, 27: False}, delay_after=0.1), + GpioPattern(pins={26: True, 27: False}, delay_after=0.1), + GpioPattern(pins={26: True, 27: True}, delay_after=0.0), + ] + ), + ResetTarget.SLZB07: GpioResetConfig( + chip=None, + chip_type="cp210x", + pattern=[ + GpioPattern(pins={4: True, 5: True}, delay_after=0.1), + GpioPattern(pins={4: False, 5: False}, delay_after=0.1), + GpioPattern(pins={4: True, 5: False}, delay_after=0.1), + GpioPattern(pins={4: True, 5: True}, delay_after=0.0), + ] + ), + ResetTarget.RTS_DTR: GpioResetConfig( + chip=None, + chip_type="uart", + pattern=[ + GpioPattern(pins={"dtr": False, "rts": True}, delay_after=0.1), + GpioPattern(pins={"dtr": True, "rts": False}, delay_after=0.5), + GpioPattern(pins={"dtr": False, "rts": False}, delay_after=0.0), + ] + ), } +# fmt: on + +GPIO_CONFIGS[ResetTarget.SONOFF] = GPIO_CONFIGS[ResetTarget.RTS_DTR] diff --git a/universal_silabs_flasher/flash.py b/universal_silabs_flasher/flash.py index 290ae26..d9c5395 100644 --- a/universal_silabs_flasher/flash.py +++ b/universal_silabs_flasher/flash.py @@ -144,7 +144,7 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context ) @click.option( "--bootloader-reset", - type=click.Choice([t.value for t in ResetTarget]), + type=click.Choice([t.value for t in ResetTarget if t != ResetTarget.SONOFF]), ) @click.pass_context def main( @@ -339,7 +339,7 @@ async def flash( flasher._reset_target = ResetTarget.YELLOW _LOGGER.info(reset_msg, "--yellow-gpio-reset") elif sonoff_reset: - flasher._reset_target = ResetTarget.SONOFF + flasher._reset_target = ResetTarget.RTS_DTR _LOGGER.info(reset_msg, "--sonoff-reset") try: diff --git a/universal_silabs_flasher/flasher.py b/universal_silabs_flasher/flasher.py index 3d8b7e3..013892d 100644 --- a/universal_silabs_flasher/flasher.py +++ b/universal_silabs_flasher/flasher.py @@ -68,32 +68,33 @@ def __init__( ResetTarget(bootloader_reset) if bootloader_reset else None ) - async def enter_bootloader_reset(self, target): + async def enter_bootloader_reset(self, target: ResetTarget) -> None: _LOGGER.info(f"Triggering {target.value} bootloader") - if target in GPIO_CONFIGS.keys(): - config = GPIO_CONFIGS[target] - if "chip" not in config.keys(): - _LOGGER.warning( - f"When using {target.value} bootloader reset " - + "ensure no other CP2102 USB serial devices are connected." - ) - config["chip"] = await find_gpiochip_by_label(config["chip_name"]) - await send_gpio_pattern( - config["chip"], config["pin_states"], config["toggle_delay"] + + config = GPIO_CONFIGS[target] + chip = config.chip + + if config.chip_type == "cp210x": + _LOGGER.warning( + "When using %s bootloader reset ensure no other CP2102 USB serial" + " devices are connected.", + target.value, ) + + chip = await find_gpiochip_by_label(config.chip_type) + + if config.chip_type == "uart": + # The baudrate isn't really necessary, since we're using flow control pins + baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0] + + async with connect_protocol( + self._device, baudrate, FlowControlSerialProtocol + ) as uart: + for pattern in config.pattern: + await uart.set_signals(**pattern.pins) + await asyncio.sleep(pattern.delay_after) else: - await self.enter_serial_bootloader() - - async def enter_serial_bootloader(self): - baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0] - async with connect_protocol( - self._device, baudrate, FlowControlSerialProtocol - ) as sonoff: - await sonoff.set_flow_control(dtr=False, rts=True) - await asyncio.sleep(0.1) - await sonoff.set_flow_control(dtr=True, rts=False) - await asyncio.sleep(0.5) - await sonoff.set_flow_control(dtr=False, rts=False) + await send_gpio_pattern(chip, config.pattern) def _connect_gecko_bootloader(self, baudrate: int): return connect_protocol(self._device, baudrate, GeckoBootloaderProtocol) diff --git a/universal_silabs_flasher/gpio.py b/universal_silabs_flasher/gpio.py index f603e92..5f6be11 100644 --- a/universal_silabs_flasher/gpio.py +++ b/universal_silabs_flasher/gpio.py @@ -5,6 +5,8 @@ import time import typing +from .const import GpioPattern + try: import gpiod @@ -14,20 +16,14 @@ if gpiod is None: # No gpiod library - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: raise NotImplementedError("GPIO not supported on this platform") elif is_gpiod_v1: # gpiod <= 1.5.4 - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: - num_states = len(next(iter(pin_states.values()))) - + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: chip = gpiod.chip(chip, gpiod.chip.OPEN_BY_PATH) - lines = chip.get_lines(pin_states.keys()) + lines = chip.get_lines(pattern[0].pins.keys()) config = gpiod.line_request() config.consumer = "universal-silabs-flasher" @@ -35,12 +31,13 @@ def _send_gpio_pattern( try: # Open the pins and set their initial states - lines.request(config, [int(states[0]) for states in pin_states.values()]) + lines.request(config, [int(v) for v in pattern[0].pins.values()]) + time.sleep(pattern[0].delay_after) # Send all subsequent states - for i in range(1, num_states): - time.sleep(toggle_delay) - lines.set_values([int(states[i]) for states in pin_states.values()]) + for p in pattern[1:]: + lines.set_values([int(v) for v in p.pins.values()]) + time.sleep(p.delay_after) finally: # Clean up and ensure the GPIO pins are reset to inputs lines.set_direction_input() @@ -48,12 +45,8 @@ def _send_gpio_pattern( else: # gpiod >= 2.0.2 - def _send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float - ) -> None: + def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: # `gpiod` isn't available on Windows - num_states = len(next(iter(pin_states.values()))) - with gpiod.request_lines( path=chip, consumer="universal-silabs-flasher", @@ -61,27 +54,29 @@ def _send_gpio_pattern( # Set initial states pin: gpiod.LineSettings( direction=gpiod.line.Direction.OUTPUT, - output_value=gpiod.line.Value(states[0]), + output_value=gpiod.line.Value(state), ) - for pin, states in pin_states.items() + for pin, state in pattern[0].pins.items() }, ) as request: + time.sleep(pattern[0].delay_after) + try: # Send all subsequent states - for i in range(1, num_states): - time.sleep(toggle_delay) + for p in pattern[1:]: request.set_values( { - pin: gpiod.line.Value(int(pin_states[pin][i])) - for pin, states in pin_states.items() + pin: gpiod.line.Value(int(state)) + for pin, state in p.pins.items() } ) + time.sleep(p.delay_after) finally: # Clean up and ensure the GPIO pins are reset to inputs request.reconfigure_lines( { pin: gpiod.LineSettings(direction=gpiod.line.Direction.INPUT) - for pin, states in pin_states.items() + for pin in pattern[0].pins.keys() } ) @@ -119,9 +114,7 @@ async def find_gpiochip_by_label(label: str) -> str: return result -async def send_gpio_pattern( - chip: str, pin_states: dict[int, list[bool]], toggle_delay: float -) -> None: +async def send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: await asyncio.get_running_loop().run_in_executor( - None, _send_gpio_pattern, chip, pin_states, toggle_delay + None, _send_gpio_pattern, chip, pattern ) From dac95086032e0fe025badf378cb2cb5de0e56b12 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:51:24 -0500 Subject: [PATCH 5/7] Deprecate `SONOFF` --- universal_silabs_flasher/common.py | 17 ++++++++++++----- universal_silabs_flasher/const.py | 5 ----- universal_silabs_flasher/flash.py | 9 ++++++++- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index 703a824..c35cfce 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -247,13 +247,13 @@ def _set_signals( dtr: bool | None = None, ) -> None: if rts is not None: - self.transport.serial.rts = rts + self._transport.serial.rts = rts if cts is not None: - self.transport.serial.cts = cts + self._transport.serial.cts = cts if dtr is not None: - self.transport.serial.dtr = dtr + self._transport.serial.dtr = dtr async def set_signals( self, @@ -262,8 +262,15 @@ async def set_signals( cts: bool | None = None, dtr: bool | None = None, ) -> None: - if hasattr(self.transport, "set_signals"): - await self.transport.set_signals(rts=rts, cts=cts, dtr=dtr) + _LOGGER.debug( + "Setting UART signals: rts=%s, cts=%s, dtr=%s", + int(rts) if rts is not None else "-", + int(cts) if cts is not None else "-", + int(dtr) if dtr is not None else "-", + ) + + if hasattr(self._transport, "set_signals"): + await self._transport.set_signals(rts=rts, cts=cts, dtr=dtr) return loop = asyncio.get_running_loop() diff --git a/universal_silabs_flasher/const.py b/universal_silabs_flasher/const.py index 6fa4df3..5806421 100644 --- a/universal_silabs_flasher/const.py +++ b/universal_silabs_flasher/const.py @@ -55,9 +55,6 @@ class ResetTarget(enum.Enum): SLZB07 = "slzb07" RTS_DTR = "rts_dtr" - # Deprecated alias for `RTS_DTR` - SONOFF = "sonoff" - @dataclasses.dataclass class GpioPattern: @@ -115,5 +112,3 @@ class GpioResetConfig: ), } # fmt: on - -GPIO_CONFIGS[ResetTarget.SONOFF] = GPIO_CONFIGS[ResetTarget.RTS_DTR] diff --git a/universal_silabs_flasher/flash.py b/universal_silabs_flasher/flash.py index d9c5395..d9e88e7 100644 --- a/universal_silabs_flasher/flash.py +++ b/universal_silabs_flasher/flash.py @@ -144,7 +144,7 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context ) @click.option( "--bootloader-reset", - type=click.Choice([t.value for t in ResetTarget if t != ResetTarget.SONOFF]), + type=click.Choice([t.value for t in ResetTarget] + ["sonoff"]), ) @click.pass_context def main( @@ -189,6 +189,13 @@ def main( param = next(p for p in ctx.command.params if p.name == "device") raise click.MissingParameter(ctx=ctx, param=param) + if bootloader_reset == "sonoff": + _LOGGER.warning( + "The 'sonoff' reset target is deprecated." + " Use '--bootloader-reset rts_dtr' instead." + ) + bootloader_reset = ResetTarget.RTS_DTR.value + ctx.obj = { "verbosity": verbose, "flasher": Flasher( From f74c901b853582e3b126f15bc8896a623d2b1c21 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:57:49 -0500 Subject: [PATCH 6/7] Fix type annotations --- universal_silabs_flasher/const.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/universal_silabs_flasher/const.py b/universal_silabs_flasher/const.py index 5806421..32e8e0e 100644 --- a/universal_silabs_flasher/const.py +++ b/universal_silabs_flasher/const.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import enum From ad6d6f542010f29fc90c108d56b3eab8341db790 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 13 Jan 2025 14:53:27 -0500 Subject: [PATCH 7/7] Improve logging --- universal_silabs_flasher/gpio.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/universal_silabs_flasher/gpio.py b/universal_silabs_flasher/gpio.py index 5f6be11..e95c890 100644 --- a/universal_silabs_flasher/gpio.py +++ b/universal_silabs_flasher/gpio.py @@ -1,12 +1,15 @@ from __future__ import annotations import asyncio +import logging from os import scandir import time import typing from .const import GpioPattern +_LOGGER = logging.getLogger(__name__) + try: import gpiod @@ -31,11 +34,13 @@ def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: try: # Open the pins and set their initial states + _LOGGER.debug("Sending GPIO pattern %r", pattern[0]) lines.request(config, [int(v) for v in pattern[0].pins.values()]) time.sleep(pattern[0].delay_after) # Send all subsequent states for p in pattern[1:]: + _LOGGER.debug("Sending GPIO pattern %r", p) lines.set_values([int(v) for v in p.pins.values()]) time.sleep(p.delay_after) finally: @@ -46,7 +51,8 @@ def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: else: # gpiod >= 2.0.2 def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: - # `gpiod` isn't available on Windows + _LOGGER.debug("Sending GPIO pattern %r", pattern[0]) + with gpiod.request_lines( path=chip, consumer="universal-silabs-flasher", @@ -64,6 +70,7 @@ def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None: try: # Send all subsequent states for p in pattern[1:]: + _LOGGER.debug("Sending GPIO pattern %r", p) request.set_values( { pin: gpiod.line.Value(int(state))