Skip to content

Commit

Permalink
Remove unnecessary unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Nov 15, 2023
1 parent 7a6a56d commit 7ba056e
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 204 deletions.
138 changes: 0 additions & 138 deletions tests/application/test_connect.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -118,48 +117,6 @@ async def test_probe_multiple(device, make_znp_server):
assert not any([t._is_connected for t in znp_server._transports])


@pytest.mark.parametrize("device", FORMED_DEVICES)
async def test_reconnect(device, make_application):
app, znp_server = make_application(
server_cls=device,
client_config={
# Make auto-reconnection happen really fast
conf.CONF_ZNP_CONFIG: {
conf.CONF_AUTO_RECONNECT_RETRY_DELAY: 0.01,
conf.CONF_SREQ_TIMEOUT: 0.1,
}
},
shorten_delays=False,
)

# Start up the server
await app.startup(auto_form=False)
assert app._znp is not None

# Don't reply to anything for a bit
with patch.object(znp_server, "frame_received", lambda _: None):
# Now that we're connected, have the server close the connection
znp_server._uart._transport.close()

# ZNP should be closed
assert app._znp is None

# Wait for more than the SREQ_TIMEOUT to pass, we should still fail to reconnect
await asyncio.sleep(0.3)

assert not app._reconnect_task.done()
assert app._znp is None

# Our reconnect task should complete a moment after we send the ping reply
while app._znp is None:
await asyncio.sleep(0.1)

assert app._znp is not None
assert app._znp._uart is not None

await app.shutdown()


@pytest.mark.parametrize("device", FORMED_DEVICES)
async def test_shutdown_from_app(device, mocker, make_application):
app, znp_server = make_application(server_cls=device)
Expand All @@ -185,7 +142,6 @@ async def test_clean_shutdown(make_application):
await app.shutdown()

assert app._znp is None
assert app._reconnect_task.cancelled()


async def test_multiple_shutdown(make_application):
Expand All @@ -197,100 +153,6 @@ async def test_multiple_shutdown(make_application):
await app.shutdown()


@pytest.mark.parametrize("device", FORMED_DEVICES)
async def test_reconnect_lockup(device, make_application, mocker):
mocker.patch("zigpy_znp.zigbee.application.WATCHDOG_PERIOD", 0.1)

app, znp_server = make_application(
server_cls=device,
client_config={
# Make auto-reconnection happen really fast
conf.CONF_ZNP_CONFIG: {
conf.CONF_AUTO_RECONNECT_RETRY_DELAY: 0.01,
conf.CONF_SREQ_TIMEOUT: 0.1,
}
},
)

# Start up the server
await app.startup(auto_form=False)

# Stop responding
with patch.object(znp_server, "frame_received", lambda _: None):
assert app._znp is not None
assert app._reconnect_task.done()

# Wait for more than the SREQ_TIMEOUT to pass, the watchdog will notice
await asyncio.sleep(0.3)

# We will treat this as a disconnect
assert app._znp is None
assert app._watchdog_task.done()
assert not app._reconnect_task.done()

# Our reconnect task should complete after that
while app._znp is None:
await asyncio.sleep(0.1)

assert app._znp is not None
assert app._znp._uart is not None

await app.shutdown()


@pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1])
async def test_reconnect_lockup_pyserial(device, make_application, mocker):
mocker.patch("zigpy_znp.zigbee.application.WATCHDOG_PERIOD", 0.1)

app, znp_server = make_application(
server_cls=device,
client_config={
conf.CONF_ZNP_CONFIG: {
conf.CONF_AUTO_RECONNECT_RETRY_DELAY: 0.01,
conf.CONF_SREQ_TIMEOUT: 0.1,
}
},
)

# Start up the server
await app.startup(auto_form=False)

# On Linux, a connection error during read with queued writes will cause PySerial to
# swallow the exception. This makes it appear like we intentionally closed the
# connection.

# We are connected
assert app._znp is not None

did_start_network = asyncio.get_running_loop().create_future()

async def patched_start_network(old_start_network=app.start_network, **kwargs):
try:
return await old_start_network(**kwargs)
finally:
did_start_network.set_result(True)

with patch.object(app, "start_network", patched_start_network):
# "Drop" the connection like PySerial
app._znp._uart.connection_lost(exc=None)

# Wait until we are reconnecting
await did_start_network

# "Drop" the connection like PySerial again, but during connect
app._znp._uart.connection_lost(exc=None)

# We should reconnect soon
mocker.spy(app, "_watchdog_loop")

while app._watchdog_loop.call_count == 0:
await asyncio.sleep(0.1)

assert app._znp and app._znp._uart

await app.shutdown()


@pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1])
async def test_disconnect(device, make_application):
app, znp_server = make_application(
Expand Down
19 changes: 3 additions & 16 deletions tests/application/test_joining.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ async def test_permit_join_with_key(device, permit_result, make_application, moc
# Consciot bulb
ieee = t.EUI64.convert("EC:1B:BD:FF:FE:54:4F:40")
code = bytes.fromhex("17D1856872570CEB7ACB53030C5D6DA368B1")
link_key = t.KeyData(zigpy.util.convert_install_code(code))

bdb_add_install_code = znp_server.reply_once_to(
c.AppConfig.BDBAddInstallCode.Req(
InstallCodeFormat=c.app_config.InstallCodeFormat.KeyDerivedFromInstallCode,
IEEE=ieee,
InstallCode=t.Bytes(zigpy.util.convert_install_code(code)),
InstallCode=t.Bytes(link_key),
),
responses=[c.AppConfig.BDBAddInstallCode.Rsp(Status=t.Status.SUCCESS)],
)
Expand All @@ -171,7 +172,7 @@ async def test_permit_join_with_key(device, permit_result, make_application, moc
with contextlib.nullcontext() if permit_result is None else pytest.raises(
asyncio.TimeoutError
):
await app.permit_with_key(node=ieee, code=code, time_s=1)
await app.permit_with_link_key(node=ieee, link_key=link_key, time_s=1)

await bdb_add_install_code
await join_enable_install_code
Expand All @@ -183,20 +184,6 @@ async def test_permit_join_with_key(device, permit_result, make_application, moc
await app.shutdown()


@pytest.mark.parametrize("device", FORMED_ZSTACK3_DEVICES)
async def test_permit_join_with_invalid_key(device, make_application):
app, znp_server = make_application(server_cls=device)

# Consciot bulb
ieee = t.EUI64.convert("EC:1B:BD:FF:FE:54:4F:40")
code = bytes.fromhex("17D1856872570CEB7ACB53030C5D6DA368B1")[:-1] # truncate it

with pytest.raises(ValueError):
await app.permit_with_key(node=ieee, code=code, time_s=1)

await app.shutdown()


@pytest.mark.parametrize("device", FORMED_DEVICES)
async def test_on_zdo_device_join(device, make_application, mocker):
app, znp_server = make_application(server_cls=device)
Expand Down
11 changes: 7 additions & 4 deletions tests/application/test_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@

DEV_NETWORK_SETTINGS = {
FormedLaunchpadCC26X2R1: (
"CC2652" f"Z-Stack {FormedLaunchpadCC26X2R1.code_revision}",
"CC2652",
f"Z-Stack {FormedLaunchpadCC26X2R1.code_revision}",
15,
t.Channels.from_channel_list([15]),
0x4402,
t.EUI64.convert("A2:BA:38:A8:B5:E6:83:A0"),
t.KeyData.convert("4C:4E:72:B8:41:22:51:79:9A:BF:35:25:12:88:CA:83"),
),
FormedZStack3CC2531: (
"CC2531" f"Z-Stack 3.0.x {FormedZStack3CC2531.code_revision}",
"CC2531",
f"Z-Stack 3.0.x {FormedZStack3CC2531.code_revision}",
15,
t.Channels.from_channel_list([15]),
0xB6AB,
t.EUI64.convert("62:92:32:46:3C:77:2D:B2"),
t.KeyData.convert("6D:DE:24:EA:E2:85:52:B6:DE:29:56:EB:05:85:1A:FA"),
),
FormedZStack1CC2531: (
"CC2531" f"Z-Stack Home 1.2 {FormedZStack1CC2531.code_revision}",
"CC2531",
f"Z-Stack Home 1.2 {FormedZStack1CC2531.code_revision}",
11,
t.Channels.from_channel_list([11]),
0x1A62,
Expand All @@ -50,7 +53,7 @@

# These settings were extracted from beacon requests and key exchanges in Wireshark
@pytest.mark.parametrize(
"device,model,channel,channels,pan_id,ext_pan_id,network_key",
"device,model,version,channel,channels,pan_id,ext_pan_id,network_key",
[(device_cls,) + settings for device_cls, settings in DEV_NETWORK_SETTINGS.items()],
)
async def test_info(
Expand Down
4 changes: 3 additions & 1 deletion tests/tools/test_network_backup_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ async def test_network_backup_formed(device, make_znp_server, tmp_path):
znp_server = make_znp_server(server_cls=device)

# We verified these settings with Wireshark
_, channel, channels, pan_id, ext_pan_id, network_key = DEV_NETWORK_SETTINGS[device]
_, _, channel, channels, pan_id, ext_pan_id, network_key = DEV_NETWORK_SETTINGS[
device
]

backup_file = tmp_path / "backup.json"
await network_backup([znp_server._port_path, "-o", str(backup_file)])
Expand Down
1 change: 1 addition & 0 deletions zigpy_znp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CONF_DEVICE,
CONF_NWK_KEY,
CONFIG_SCHEMA,
SCHEMA_DEVICE,
CONF_NWK_PAN_ID,
CONF_DEVICE_PATH,
CONF_NWK_CHANNEL,
Expand Down
7 changes: 4 additions & 3 deletions zigpy_znp/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import logging

import zigpy.config
import zigpy.serial

import zigpy_znp.config as conf
Expand Down Expand Up @@ -161,9 +162,9 @@ def __repr__(self) -> str:
async def connect(config: conf.ConfigType, api) -> ZnpMtProtocol:
loop = asyncio.get_running_loop()

port = config[conf.CONF_DEVICE_PATH]
baudrate = config[conf.CONF_DEVICE_BAUDRATE]
flow_control = config[conf.CONF_DEVICE_FLOW_CONTROL]
port = config[zigpy.config.CONF_DEVICE_PATH]
baudrate = config[zigpy.config.CONF_DEVICE_BAUDRATE]
flow_control = config[zigpy.config.CONF_DEVICE_FLOW_CONTROL]

LOGGER.debug("Connecting to %s at %s baud", port, baudrate)

Expand Down
42 changes: 0 additions & 42 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import asyncio
import logging
import itertools

import zigpy.zcl
import zigpy.zdo
Expand Down Expand Up @@ -77,19 +76,12 @@ class RetryMethod(t.bitmap8):

class ControllerApplication(zigpy.application.ControllerApplication):
SCHEMA = conf.CONFIG_SCHEMA
SCHEMA_DEVICE = conf.SCHEMA_DEVICE

def __init__(self, config: conf.ConfigType):
super().__init__(config=conf.CONFIG_SCHEMA(config))

self._znp: ZNP | None = None

# It's simpler to work with Task objects if they're never actually None
self._reconnect_task: asyncio.Future = asyncio.Future()
self._reconnect_task.cancel()

self._version_rsp = None

self._join_announce_tasks: dict[t.EUI64, asyncio.TimerHandle] = {}

##################################################################
Expand Down Expand Up @@ -690,40 +682,6 @@ async def _write_stack_settings(self) -> bool:

return any_changed

async def _reconnect(self) -> None:
"""
Endlessly tries to reconnect to the currently configured radio.
Relies on the fact that `self.startup()` only modifies `self` upon a successful
connection to be essentially stateless.
"""

for attempt in itertools.count(start=1):
LOGGER.debug(
"Trying to reconnect to %s, attempt %d",
self._config[conf.CONF_DEVICE][conf.CONF_DEVICE_PATH],
attempt,
)

try:
await self.connect()
await self.initialize()
return
except asyncio.CancelledError:
raise
except Exception as e:
LOGGER.error("Failed to reconnect", exc_info=e)

if self._znp is not None:
self._znp.close()
self._znp = None

await asyncio.sleep(
self._config[conf.CONF_ZNP_CONFIG][
conf.CONF_AUTO_RECONNECT_RETRY_DELAY
]
)

def _find_endpoint(self, dst_ep: int, profile: int, cluster: int) -> int:
"""
Zigpy defaults to sending messages with src_ep == dst_ep. This does not work
Expand Down

0 comments on commit 7ba056e

Please sign in to comment.