Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge GPIO and flow control bootloader reset #94

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions universal_silabs_flasher/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,44 @@ def __repr__(self) -> str:
return f"{concatenated!r}"

return f"{concatenated!r} ({comparable})"


class FlowControlSerialProtocol(zigpy.serial.SerialProtocol):
def _set_signals(
self,
*,
rts: bool | None = None,
cts: bool | None = None,
dtr: bool | None = None,
) -> None:
if rts is not None:
self._transport.serial.rts = rts

if cts is not None:
self._transport.serial.cts = cts

if dtr is not None:
self._transport.serial.dtr = dtr

async def set_signals(
self,
*,
rts: bool | None = None,
cts: bool | None = None,
dtr: bool | None = None,
) -> None:
_LOGGER.debug(
"Setting UART signals: rts=%s, cts=%s, dtr=%s",
int(rts) if rts is not None else "-",
int(cts) if cts is not None else "-",
int(dtr) if dtr is not None else "-",
)

if hasattr(self._transport, "set_signals"):
await self._transport.set_signals(rts=rts, cts=cts, dtr=dtr)
return

loop = asyncio.get_running_loop()
await loop.run_in_executor(
None, lambda: self._set_signals(rts=rts, cts=cts, dtr=dtr)
)
83 changes: 58 additions & 25 deletions universal_silabs_flasher/const.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from __future__ import annotations

import dataclasses
import enum


Expand Down Expand Up @@ -52,32 +55,62 @@ class ResetTarget(enum.Enum):
YELLOW = "yellow"
IHOST = "ihost"
SLZB07 = "slzb07"
SONOFF = "sonoff"
RTS_DTR = "rts_dtr"


@dataclasses.dataclass
class GpioPattern:
pins: dict[str | int, bool]
delay_after: float


@dataclasses.dataclass
class GpioResetConfig:
chip: str | None
chip_type: str | None
pattern: list[GpioPattern]


# fmt: off
GPIO_CONFIGS = {
ResetTarget.YELLOW: {
"chip": "/dev/gpiochip0",
"pin_states": {
24: [True, False, False, True],
25: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.IHOST: {
"chip": "/dev/gpiochip1",
"pin_states": {
27: [True, False, False, True],
26: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.SLZB07: {
"chip_name": "cp210x",
"pin_states": {
5: [True, False, False, True],
4: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.YELLOW: GpioResetConfig(
chip="/dev/gpiochip0",
chip_type=None,
pattern=[
GpioPattern(pins={24: True, 25: True}, delay_after=0.1),
GpioPattern(pins={24: False, 25: False}, delay_after=0.1),
GpioPattern(pins={24: False, 25: True}, delay_after=0.1),
GpioPattern(pins={24: True, 25: True}, delay_after=0.0),
],
),
ResetTarget.IHOST: GpioResetConfig(
chip="/dev/gpiochip1",
chip_type=None,
pattern=[
GpioPattern(pins={26: True, 27: True}, delay_after=0.1),
GpioPattern(pins={26: False, 27: False}, delay_after=0.1),
GpioPattern(pins={26: True, 27: False}, delay_after=0.1),
GpioPattern(pins={26: True, 27: True}, delay_after=0.0),
]
),
ResetTarget.SLZB07: GpioResetConfig(
chip=None,
chip_type="cp210x",
pattern=[
GpioPattern(pins={4: True, 5: True}, delay_after=0.1),
GpioPattern(pins={4: False, 5: False}, delay_after=0.1),
GpioPattern(pins={4: True, 5: False}, delay_after=0.1),
GpioPattern(pins={4: True, 5: True}, delay_after=0.0),
]
),
ResetTarget.RTS_DTR: GpioResetConfig(
chip=None,
chip_type="uart",
pattern=[
GpioPattern(pins={"dtr": False, "rts": True}, delay_after=0.1),
GpioPattern(pins={"dtr": True, "rts": False}, delay_after=0.5),
GpioPattern(pins={"dtr": False, "rts": False}, delay_after=0.0),
]
),
}
# fmt: on
11 changes: 9 additions & 2 deletions universal_silabs_flasher/flash.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context
)
@click.option(
"--bootloader-reset",
type=click.Choice([t.value for t in ResetTarget]),
type=click.Choice([t.value for t in ResetTarget] + ["sonoff"]),
)
@click.pass_context
def main(
Expand Down Expand Up @@ -189,6 +189,13 @@ def main(
param = next(p for p in ctx.command.params if p.name == "device")
raise click.MissingParameter(ctx=ctx, param=param)

if bootloader_reset == "sonoff":
_LOGGER.warning(
"The 'sonoff' reset target is deprecated."
" Use '--bootloader-reset rts_dtr' instead."
)
bootloader_reset = ResetTarget.RTS_DTR.value

ctx.obj = {
"verbosity": verbose,
"flasher": Flasher(
Expand Down Expand Up @@ -339,7 +346,7 @@ async def flash(
flasher._reset_target = ResetTarget.YELLOW
_LOGGER.info(reset_msg, "--yellow-gpio-reset")
elif sonoff_reset:
flasher._reset_target = ResetTarget.SONOFF
flasher._reset_target = ResetTarget.RTS_DTR
_LOGGER.info(reset_msg, "--sonoff-reset")

try:
Expand Down
50 changes: 25 additions & 25 deletions universal_silabs_flasher/flasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import bellows.config
import bellows.ezsp
import bellows.types
from zigpy.serial import SerialProtocol
import zigpy.types

from .common import (
PROBE_TIMEOUT,
FlowControlSerialProtocol,
Version,
asyncio_timeout,
connect_protocol,
Expand Down Expand Up @@ -68,33 +68,33 @@ def __init__(
ResetTarget(bootloader_reset) if bootloader_reset else None
)

async def enter_bootloader_reset(self, target):
async def enter_bootloader_reset(self, target: ResetTarget) -> None:
_LOGGER.info(f"Triggering {target.value} bootloader")
if target in GPIO_CONFIGS.keys():
config = GPIO_CONFIGS[target]
if "chip" not in config.keys():
_LOGGER.warning(
f"When using {target.value} bootloader reset "
+ "ensure no other CP2102 USB serial devices are connected."
)
config["chip"] = await find_gpiochip_by_label(config["chip_name"])
await send_gpio_pattern(
config["chip"], config["pin_states"], config["toggle_delay"]

config = GPIO_CONFIGS[target]
chip = config.chip

if config.chip_type == "cp210x":
_LOGGER.warning(
"When using %s bootloader reset ensure no other CP2102 USB serial"
" devices are connected.",
target.value,
)

chip = await find_gpiochip_by_label(config.chip_type)

if config.chip_type == "uart":
# The baudrate isn't really necessary, since we're using flow control pins
baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0]

async with connect_protocol(
self._device, baudrate, FlowControlSerialProtocol
) as uart:
for pattern in config.pattern:
await uart.set_signals(**pattern.pins)
await asyncio.sleep(pattern.delay_after)
else:
await self.enter_serial_bootloader()

async def enter_serial_bootloader(self):
baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0]
async with connect_protocol(self._device, baudrate, SerialProtocol) as sonoff:
serial = sonoff._transport.serial
serial.dtr = False
serial.rts = True
await asyncio.sleep(0.1)
serial.dtr = True
serial.rts = False
await asyncio.sleep(0.5)
serial.dtr = False
await send_gpio_pattern(chip, config.pattern)

def _connect_gecko_bootloader(self, baudrate: int):
return connect_protocol(self._device, baudrate, GeckoBootloaderProtocol)
Expand Down
58 changes: 29 additions & 29 deletions universal_silabs_flasher/gpio.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import asyncio
import logging
from os import scandir
import time
import typing

from .const import GpioPattern

_LOGGER = logging.getLogger(__name__)

try:
import gpiod

Expand All @@ -14,45 +19,39 @@

if gpiod is None:
# No gpiod library
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
raise NotImplementedError("GPIO not supported on this platform")

elif is_gpiod_v1:
# gpiod <= 1.5.4
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
num_states = len(next(iter(pin_states.values())))

def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
chip = gpiod.chip(chip, gpiod.chip.OPEN_BY_PATH)
lines = chip.get_lines(pin_states.keys())
lines = chip.get_lines(pattern[0].pins.keys())

config = gpiod.line_request()
config.consumer = "universal-silabs-flasher"
config.request_type = gpiod.line_request.DIRECTION_OUTPUT

try:
# Open the pins and set their initial states
lines.request(config, [int(states[0]) for states in pin_states.values()])
_LOGGER.debug("Sending GPIO pattern %r", pattern[0])
lines.request(config, [int(v) for v in pattern[0].pins.values()])
time.sleep(pattern[0].delay_after)

# Send all subsequent states
for i in range(1, num_states):
time.sleep(toggle_delay)
lines.set_values([int(states[i]) for states in pin_states.values()])
for p in pattern[1:]:
_LOGGER.debug("Sending GPIO pattern %r", p)
lines.set_values([int(v) for v in p.pins.values()])
time.sleep(p.delay_after)
finally:
# Clean up and ensure the GPIO pins are reset to inputs
lines.set_direction_input()
lines.release()

else:
# gpiod >= 2.0.2
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
# `gpiod` isn't available on Windows
num_states = len(next(iter(pin_states.values())))
def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
_LOGGER.debug("Sending GPIO pattern %r", pattern[0])

with gpiod.request_lines(
path=chip,
Expand All @@ -61,27 +60,30 @@ def _send_gpio_pattern(
# Set initial states
pin: gpiod.LineSettings(
direction=gpiod.line.Direction.OUTPUT,
output_value=gpiod.line.Value(states[0]),
output_value=gpiod.line.Value(state),
)
for pin, states in pin_states.items()
for pin, state in pattern[0].pins.items()
},
) as request:
time.sleep(pattern[0].delay_after)

try:
# Send all subsequent states
for i in range(1, num_states):
time.sleep(toggle_delay)
for p in pattern[1:]:
_LOGGER.debug("Sending GPIO pattern %r", p)
request.set_values(
{
pin: gpiod.line.Value(int(pin_states[pin][i]))
for pin, states in pin_states.items()
pin: gpiod.line.Value(int(state))
for pin, state in p.pins.items()
}
)
time.sleep(p.delay_after)
finally:
# Clean up and ensure the GPIO pins are reset to inputs
request.reconfigure_lines(
{
pin: gpiod.LineSettings(direction=gpiod.line.Direction.INPUT)
for pin, states in pin_states.items()
for pin in pattern[0].pins.keys()
}
)

Expand Down Expand Up @@ -119,9 +121,7 @@ async def find_gpiochip_by_label(label: str) -> str:
return result


async def send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
async def send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
await asyncio.get_running_loop().run_in_executor(
None, _send_gpio_pattern, chip, pin_states, toggle_delay
None, _send_gpio_pattern, chip, pattern
)
Loading