Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix command deserialization for getTokenData #619

Merged
merged 4 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,14 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:
t.NV3KeyId.NVM3KEY_STACK_RESTORED_EUI64, # RCP firmware
):
try:
status, data = await self.getTokenData(key, 0)
rsp = await self.getTokenData(key, 0)
except (InvalidCommandError, AttributeError):
# Either the command doesn't exist in the EZSP version, or the command
# is not implemented in the firmware
return None

if status == t.EmberStatus.SUCCESS:
nv3_restored_eui64, _ = t.EUI64.deserialize(data)
if rsp.status == t.EmberStatus.SUCCESS:
nv3_restored_eui64, _ = t.EUI64.deserialize(rsp.value)
LOGGER.debug("NV3 restored EUI64: %s=%s", key, nv3_restored_eui64)

return key
Expand Down
13 changes: 8 additions & 5 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ async def command(self, name, *args) -> Any:
LOGGER.debug("Send command %s: %s", name, args)
data = self._ezsp_frame(name, *args)
self._gw.data(data)
c = self.COMMANDS[name]
future = asyncio.Future()
self._awaiting[self._seq] = (c[0], c[2], future)
cmd_id, _, rx_schema = self.COMMANDS[name]
future = asyncio.get_running_loop().create_future()
self._awaiting[self._seq] = (cmd_id, rx_schema, future)
self._seq = (self._seq + 1) % 256

async with asyncio_timeout(EZSP_CMD_TIMEOUT):
Expand All @@ -89,7 +89,7 @@ def __call__(self, data: bytes) -> None:
sequence, frame_id, data = self._ezsp_frame_rx(data)

try:
frame_name, _, schema = self.COMMANDS_BY_ID[frame_id]
frame_name, _, rx_schema = self.COMMANDS_BY_ID[frame_id]
except KeyError:
LOGGER.warning(
"Unknown application frame 0x%04X received: %s (%s). This is a bug!",
Expand All @@ -100,7 +100,10 @@ def __call__(self, data: bytes) -> None:
return

try:
result, data = self.types.deserialize(data, schema)
if isinstance(rx_schema, tuple):
result, data = self.types.deserialize(data, rx_schema)
else:
result, data = rx_schema.deserialize(data)
except Exception:
LOGGER.warning(
"Failed to parse frame %s: %s", frame_name, binascii.hexlify(data)
Expand Down
4 changes: 3 additions & 1 deletion bellows/ezsp/v10/commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from bellows.ezsp.v9.commands import GetTokenDataRsp

from . import types as t

COMMANDS = {
Expand Down Expand Up @@ -691,7 +693,7 @@
"getTokenData": (
0x0102,
(t.uint32_t, t.uint32_t),
(t.EmberStatus, t.LVBytes32),
GetTokenDataRsp,
),
"setTokenData": (
0x0103,
Expand Down
12 changes: 11 additions & 1 deletion bellows/ezsp/v9/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from zigpy.types import Struct, StructField

from . import types as t


class GetTokenDataRsp(Struct):
status: t.EmberStatus
value: t.LVBytes32 = StructField(
requires=lambda rsp: rsp.status == t.EmberStatus.SUCCESS
)


COMMANDS = {
# 4. Configuration frames
"version": (0x0000, (t.uint8_t,), (t.uint8_t, t.uint8_t, t.uint16_t)),
Expand Down Expand Up @@ -687,7 +697,7 @@
"getTokenData": (
0x0102,
(t.uint32_t, t.uint32_t),
(t.EmberStatus, t.LVBytes32),
GetTokenDataRsp,
),
"setTokenData": (
0x0103,
Expand Down
8 changes: 3 additions & 5 deletions bellows/zigbee/repairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ async def fix_invalid_tclk_partner_ieee(ezsp: EZSP) -> bool:
)

try:
(status, value) = await ezsp.getTokenData(
t.NV3KeyId.NVM3KEY_STACK_TRUST_CENTER, 0
)
assert status == t.EmberStatus.SUCCESS
rsp = await ezsp.getTokenData(t.NV3KeyId.NVM3KEY_STACK_TRUST_CENTER, 0)
assert rsp.status == t.EmberStatus.SUCCESS
except (InvalidCommandError, AttributeError, AssertionError):
LOGGER.warning("NV3 interface not available in this firmware, please upgrade!")
return False

token, remaining = t.NV3StackTrustCenterToken.deserialize(value)
token, remaining = t.NV3StackTrustCenterToken.deserialize(rsp.value)
assert not remaining
assert token.eui64 == state.trustCenterLongAddress

Expand Down
5 changes: 4 additions & 1 deletion tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import bellows.ezsp.v6.types as ezsp_t6
import bellows.ezsp.v7.types as ezsp_t7
import bellows.ezsp.v8.types as ezsp_t8
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types.struct
import bellows.uart as uart
import bellows.zigbee.application
Expand Down Expand Up @@ -154,7 +155,9 @@ async def nop_mock():
ezsp_mock.readAndClearCounters = AsyncMock(side_effect=nop_mock)
ezsp_mock._protocol = AsyncMock()
ezsp_mock.setConcentrator = AsyncMock()
ezsp_mock.getTokenData = AsyncMock(return_value=[t.EmberStatus.ERR_FATAL, b""])
ezsp_mock.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.ERR_FATAL)
)
ezsp_mock._command = AsyncMock(return_value=t.EmberStatus.SUCCESS)
ezsp_mock.addEndpoint = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp_mock.setConfigurationValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
Expand Down
5 changes: 4 additions & 1 deletion tests/test_application_network_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from bellows.exception import EzspError
from bellows.ezsp import EZSP
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types as t

from tests.async_mock import AsyncMock, PropertyMock
Expand Down Expand Up @@ -518,7 +519,9 @@ def form_network(params):

ezsp.setValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.setMfgToken = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.getTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""])
ezsp.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_PRESENT)
)


@pytest.mark.parametrize("ezsp_ver", [4, 7, 13])
Expand Down
14 changes: 10 additions & 4 deletions tests/test_ezsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from unittest.mock import ANY, AsyncMock, MagicMock, call, patch, sentinel

from bellows.ezsp.v9.commands import GetTokenDataRsp

DEVICE_CONFIG = {
zigpy.config.CONF_DEVICE_PATH: "/dev/null",
zigpy.config.CONF_DEVICE_BAUDRATE: 115200,
Expand Down Expand Up @@ -524,9 +526,9 @@ async def test_can_rewrite_custom_eui64(ezsp_f, tokens, expected_key, expected_r

def get_token_data(key, index):
if key not in tokens or index != 0:
return [t.EmberStatus.ERR_FATAL, b""]
return GetTokenDataRsp(status=t.EmberStatus.ERR_FATAL)

return [t.EmberStatus.SUCCESS, tokens[key]]
return GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=tokens[key])

ezsp_f.getTokenData = AsyncMock(side_effect=get_token_data)

Expand Down Expand Up @@ -623,7 +625,9 @@ async def test_write_custom_eui64_rcp(ezsp_f):

# RCP firmware does not support manufacturing tokens
ezsp_f.getMfgToken = AsyncMock(return_value=[b""])
ezsp_f.getTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS, b"\xFF" * 8])
ezsp_f.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=b"\xFF" * 8)
)

await ezsp_f.write_custom_eui64(new_eui64)

Expand Down Expand Up @@ -858,7 +862,9 @@ async def test_reset_custom_eui64(ezsp_f):
assert len(ezsp_f.setTokenData.mock_calls) == 0

# With NV3 interface
ezsp_f.getTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS, b"\xAB" * 8])
ezsp_f.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=b"\xAB" * 8)
)
await ezsp_f.reset_custom_eui64()
assert ezsp_f.setTokenData.mock_calls == [
call(t.NV3KeyId.CREATOR_STACK_RESTORED_EUI64, 0, t.LVBytes32(b"\xFF" * 8))
Expand Down
27 changes: 27 additions & 0 deletions tests/test_ezsp_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from bellows.ezsp import EZSP
import bellows.ezsp.v4
import bellows.ezsp.v4.types as t
import bellows.ezsp.v9
from bellows.ezsp.v9.commands import GetTokenDataRsp
from bellows.types import NV3KeyId

from .async_mock import ANY, AsyncMock, MagicMock, call, patch

Expand All @@ -16,6 +19,12 @@ def prot_hndl():
return bellows.ezsp.v4.EZSPv4(MagicMock(), MagicMock())


@pytest.fixture
def prot_hndl_v9():
"""Protocol handler mock."""
return bellows.ezsp.v9.EZSPv9(MagicMock(), MagicMock())


async def test_command(prot_hndl):
coro = prot_hndl.command("nop")
asyncio.get_running_loop().call_soon(
Expand Down Expand Up @@ -94,3 +103,21 @@ async def test_logging_frame_parsing_failure(prot_hndl, caplog) -> None:
prot_hndl(b"\xAA\xAA\x71\x22")

assert "Failed to parse frame getKeyTableEntry: b'22'" in caplog.text


async def test_parsing_schema_response(prot_hndl_v9):
"""Test parsing data with a struct schema."""

coro = prot_hndl_v9.command(
"getTokenData", NV3KeyId.CREATOR_STACK_RESTORED_EUI64, 0
)
asyncio.get_running_loop().call_soon(
lambda: prot_hndl_v9(
bytes([prot_hndl_v9._seq - 1, 0x00, 0x00])
+ t.uint16_t(prot_hndl_v9.COMMANDS["getTokenData"][0]).serialize()
+ bytes([0xB5])
)
)

rsp = await coro
assert rsp == GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_PRESENT)
21 changes: 12 additions & 9 deletions tests/test_zigbee_repairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from bellows.exception import InvalidCommandError
from bellows.ezsp import EZSP
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types as t
from bellows.zigbee import repairs

Expand Down Expand Up @@ -71,16 +72,16 @@ async def test_fix_invalid_tclk(ezsp_tclk_f: EZSP, caplog) -> None:

ezsp_tclk_f.setTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp_tclk_f.getTokenData = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
t.NV3StackTrustCenterToken(
return_value=GetTokenDataRsp(
status=t.EmberStatus.SUCCESS,
value=t.NV3StackTrustCenterToken(
mode=228,
eui64=t.EUI64.convert("BB:BB:BB:BB:BB:BB:BB:BB"),
key=t.KeyData.convert(
"21:8e:df:b8:50:a0:4a:b6:8b:c6:10:25:bc:4e:93:6a"
),
).serialize(),
]
)
)
ezsp_tclk_f.getEui64.return_value[0] = t.EUI64.convert("AA:AA:AA:AA:AA:AA:AA:AA")
ezsp_tclk_f.getCurrentSecurityState.return_value[
Expand Down Expand Up @@ -121,21 +122,23 @@ async def test_fix_invalid_tclk_all_versions(
if fw_has_token_interface:
ezsp.setTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.getTokenData = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
t.NV3StackTrustCenterToken(
return_value=GetTokenDataRsp(
status=t.EmberStatus.SUCCESS,
value=t.NV3StackTrustCenterToken(
mode=228,
eui64=t.EUI64.convert("BB:BB:BB:BB:BB:BB:BB:BB"),
key=t.KeyData.convert(
"21:8e:df:b8:50:a0:4a:b6:8b:c6:10:25:bc:4e:93:6a"
),
).serialize(),
]
)
)

if not has_library:
ezsp.setTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_LOADED])
ezsp.getTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_LOADED])
ezsp.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_LOADED)
)

ezsp.getEui64 = ezsp_tclk_f.getEui64
ezsp.getCurrentSecurityState = ezsp_tclk_f.getCurrentSecurityState
Expand Down
Loading