Skip to content

Commit

Permalink
Rename custom commands to XNCP and support board string overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed May 29, 2024
1 parent 5aa260d commit ef77644
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 104 deletions.
111 changes: 75 additions & 36 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

import bellows.config as conf
from bellows.exception import EzspError, InvalidCommandError
from bellows.ezsp import custom_commands
from bellows.ezsp import xncp
from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig
from bellows.ezsp.xncp import FirmwareFeatures
import bellows.types as t
import bellows.uart

Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(self, device_config: dict):
self._callbacks = {}
self._ezsp_event = asyncio.Event()
self._ezsp_version = v4.EZSPv4.VERSION
self._xncp_features = FirmwareFeatures.NONE
self._gw = None
self._protocol = None
self._send_sem = PriorityDynamicBoundedSemaphore(value=MAX_COMMAND_CONCURRENCY)
Expand Down Expand Up @@ -177,11 +179,23 @@ async def version(self):
if ver != self.ezsp_version:
self._switch_protocol_version(ver)
await self._command("version", ver)

try:
self._xncp_features = await self.xncp_get_supported_firmware_features()
except InvalidCommandError:
self._xncp_features = xncp.FirmwareFeatures.NONE

LOGGER.debug(
"EZSP Stack Type: %s, Stack Version: %04x, Protocol version: %s",
(
"EZSP Stack Type: %s"
", Stack Version: %04x"
", Protocol version: %s"
", XNCP features: %s"
),
stack_type,
stack_version,
ver,
self._xncp_features,
)

def close(self):
Expand Down Expand Up @@ -338,24 +352,44 @@ async def get_board_info(
) -> tuple[str, str, str | None] | tuple[None, None, str | None]:
"""Return board info."""

tokens = {}
raw_tokens: dict[t.EzspMfgTokenId, list[bytes]] = {
t.EzspMfgTokenId.MFG_STRING: [],
t.EzspMfgTokenId.MFG_BOARD_NAME: [],
}

# Prefer XNCP overrides if they exist
try:
override_board, override_manuf = await self.xncp_get_board_info_overrides()
except InvalidCommandError:
pass
else:
raw_tokens[t.EzspMfgTokenId.MFG_STRING].append(override_manuf)
raw_tokens[t.EzspMfgTokenId.MFG_BOARD_NAME].append(override_board)

# If not, read manufacturing tokens
for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
(value,) = await self.getMfgToken(token)
LOGGER.debug("Read %s token: %s", token.name, value)
raw_tokens[token].append(value)

# Tokens are fixed-length and initially filled with \xFF
result = value.rstrip(b"\xFF").split(b"\x00", 1)[0]
# Try to parse them
tokens: dict[t.EzspMfgTokenId, str] = {}

try:
result = result.decode("utf-8")
except UnicodeDecodeError:
result = "0x" + result.hex().upper()
for token_id, values in raw_tokens.items():
for value in values:
# Tokens are fixed-length and initially filled with \xFF
result = value.rstrip(b"\xFF").split(b"\x00", 1)[0]

if not result:
result = None
try:
result = result.decode("utf-8")
except UnicodeDecodeError:
result = "0x" + result.hex().upper()

tokens[token] = result
if result:
tokens[token_id] = result
break
else:
tokens[token_id] = None

(status, ver_info_bytes) = await self.getValue(
self.types.EzspValueId.VALUE_VERSION_INFO
Expand Down Expand Up @@ -638,39 +672,44 @@ async def write_config(self, config: dict) -> None:
)
continue

async def xncp_get_supported_firmware_features(
self,
) -> custom_commands.FirmwareFeatures:
"""Get supported firmware extensions."""
req = custom_commands.CustomCommand(
command_id=custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES,
payload=custom_commands.GetSupportedFeaturesReq().serialize(),
)
async def send_xncp_frame(
self, payload: xncp.XncpCommandPayload
) -> xncp.XncpCommandPayload:
"""Send an XNCP frame."""
req_frame = xncp.XncpCommand.from_payload(payload)
LOGGER.debug("Sending XNCP frame: %s", req_frame)
status, data = await self.customFrame(req_frame.serialize())

try:
status, data = await self.customFrame(req.serialize())
except InvalidCommandError:
return custom_commands.FirmwareFeatures.NONE
if status != t.EmberStatus.SUCCESS:
raise InvalidCommandError("XNCP is not supported")

if not data:
return custom_commands.FirmwareFeatures.NONE
rsp_frame = xncp.XncpCommand.from_bytes(data)
LOGGER.debug("Received XNCP frame: %s", rsp_frame)

rsp, _ = custom_commands.GetSupportedFeaturesRsp.deserialize(data)
if rsp_frame.status != t.EmberStatus.SUCCESS:
raise InvalidCommandError(f"XNCP response error: {rsp_frame.status}")

return rsp_frame.payload

async def xncp_get_supported_firmware_features(self) -> xncp.FirmwareFeatures:
"""Get supported firmware extensions."""
rsp = await self.send_xncp_frame(xncp.GetSupportedFeaturesReq())
return rsp.features

async def xncp_set_manual_source_route(
self, destination: t.NWK, route: list[t.NWK]
) -> None:
"""Set a manual source route."""
req = custom_commands.CustomCommand(
command_id=custom_commands.CustomCommandId.CMD_SET_SOURCE_ROUTE,
payload=custom_commands.SetSourceRouteReq(
destination=destination, source_route=route
).serialize(),
await self.send_xncp_frame(
xncp.SetSourceRouteReq(
destination=destination,
source_route=route,
)
)

status, data = await self.customFrame(req.serialize())
if status != self.types.EmberStatus.SUCCESS:
raise EzspError(f"Failed to set source route: {status}")
async def xncp_get_board_info_overrides(self) -> tuple[str | None, str | None]:
"""Get board information overrides."""
name_rsp = await self.send_xncp_frame(xncp.GetBoardNameReq())
manuf_rsp = await self.send_xncp_frame(xncp.GetManufNameReq())

return None
return (name_rsp.board_name or None, manuf_rsp.manuf_name or None)
59 changes: 0 additions & 59 deletions bellows/ezsp/custom_commands.py

This file was deleted.

141 changes: 141 additions & 0 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Custom EZSP commands."""
from __future__ import annotations

import dataclasses
from typing import Callable

import zigpy.types as t

from bellows.types import EmberStatus

COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {}
REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {}


def register_command(command_id: XncpCommandId) -> Callable[[type], type]:
def decorator(cls: type) -> type:
COMMANDS[command_id] = cls
REV_COMMANDS[cls] = command_id
return cls

return decorator


class Bytes(bytes):
def serialize(self) -> Bytes:
return self

@classmethod
def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]:
return cls(data), b""

def __repr__(self) -> str:
# Reading byte sequences like \x200\x21 is extremely annoying
# compared to \x20\x30\x21
escaped = "".join(f"\\x{b:02X}" for b in self)

return f"b'{escaped}'"

__str__ = __repr__


class XncpCommandId(t.enum16):
GET_SUPPORTED_FEATURES_REQ = 0x0000
SET_SOURCE_ROUTE_REQ = 0x0001
GET_BOARD_NAME_REQ = 0x0002
GET_MANUF_NAME_REQ = 0x0003

GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000
SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000
GET_BOARD_NAME_RSP = GET_BOARD_NAME_REQ | 0x8000
GET_MANUF_NAME_RSP = GET_MANUF_NAME_REQ | 0x8000

UNKNOWN = 0xFFFF


@dataclasses.dataclass
class XncpCommand:
command_id: XncpCommandId
status: EmberStatus
payload: XncpCommandPayload

@classmethod
def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand:
return cls(
command_id=REV_COMMANDS[type(payload)],
status=EmberStatus.SUCCESS,
payload=payload,
)

@classmethod
def from_bytes(cls, data: bytes) -> XncpCommand:
command_id, data = XncpCommandId.deserialize(data)
status, data = EmberStatus.deserialize(data)
payload = COMMANDS[command_id].deserialize(data)

return cls(command_id=command_id, status=status, payload=payload)

def serialize(self) -> Bytes:
return (
self.command_id.serialize()
+ self.status.serialize()
+ self.payload.serialize()
)


class FirmwareFeatures(t.bitmap32):
NONE = 0

# The firmware passes through all group traffic, regardless of group membership
MEMBER_OF_ALL_GROUPS = 1 << 0

# Source routes can be overridden by the application
MANUAL_SOURCE_ROUTE = 1 << 1

# The firmware supports overriding the board name
BOARD_MANUF = 1 << 2


class XncpCommandPayload(t.Struct):
pass


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ)
class GetSupportedFeaturesReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_RSP)
class GetSupportedFeaturesRsp(XncpCommandPayload):
features: FirmwareFeatures


@register_command(XncpCommandId.SET_SOURCE_ROUTE_REQ)
class SetSourceRouteReq(XncpCommandPayload):
destination: t.NWK
source_route: t.List[t.NWK]


@register_command(XncpCommandId.SET_SOURCE_ROUTE_RSP)
class SetSourceRouteRsp(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BOARD_NAME_REQ)
class GetBoardNameReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BOARD_NAME_RSP)
class GetBoardNameRsp(XncpCommandPayload):
board_name: Bytes


@register_command(XncpCommandId.GET_MANUF_NAME_REQ)
class GetManufNameReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_MANUF_NAME_RSP)
class GetManufNameRsp(XncpCommandPayload):
manuf_name: Bytes
Loading

0 comments on commit ef77644

Please sign in to comment.