Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application-level retries #668

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 115 additions & 87 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
from collections import defaultdict, deque
import contextlib
from datetime import datetime, timezone
import logging
import os
Expand Down Expand Up @@ -43,8 +45,11 @@
from bellows.zigbee.device import EZSPEndpoint, EZSPGroupEndpoint
import bellows.zigbee.util as util

APS_ACK_TIMEOUT = 120
RETRY_DELAYS = [0.5, 1.0, 1.5]
APS_ACK_TIMEOUT = 8

ROUTE_STATUS_TIMEOUT_MAINS = 0.5
ROUTE_STATUS_TIMEOUT_BATTERY = 8

COUNTER_EZSP_BUFFERS = "EZSP_FREE_BUFFERS"
COUNTER_NWK_CONFLICTS = "nwk_conflicts"
COUNTER_RESET_REQ = "reset_requests"
Expand Down Expand Up @@ -82,7 +87,7 @@
{zigpy.config.CONF_DEVICE_BAUDRATE: 57600},
]

def __init__(self, config: dict):
def __init__(self, config: dict) -> None:
super().__init__(config)
self._ctrl_event = asyncio.Event()
self._created_device_endpoints: list[zdo_t.SimpleDescriptor] = []
Expand All @@ -95,6 +100,9 @@

self._req_lock = asyncio.Lock()
self._packet_capture_channel: int | None = None
self._request_status_handlers: defaultdict[
t.EmberNodeId, deque[asyncio.Future]
] = defaultdict(deque)

@property
def controller_event(self):
Expand Down Expand Up @@ -826,7 +834,6 @@
aps_frame.sourceEndpoint = t.uint8_t(packet.src_ep)
aps_frame.destinationEndpoint = t.uint8_t(packet.dst_ep or 0)
aps_frame.options = t.EmberApsOption.APS_OPTION_NONE
aps_frame.options |= t.EmberApsOption.APS_OPTION_RETRY

if packet.dst.addr_mode == zigpy.types.AddrMode.Group:
aps_frame.groupId = t.uint16_t(packet.dst.address)
Expand All @@ -839,104 +846,113 @@
# Source routing uses address discovery to discover routes
aps_frame.options |= t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY

route_status_handler_future: asyncio.Future | None = None

async with self._limit_concurrency(priority=packet.priority):
message_tag = self.get_sequence()
pending_tag = (packet.dst.address, message_tag)
with self._pending.new(pending_tag) as req:
for attempt, retry_delay in enumerate(RETRY_DELAYS):
async with self._req_lock:
if packet.dst.addr_mode == zigpy.types.AddrMode.NWK:
if packet.extended_timeout and device is not None:
await self._ezsp.set_extended_timeout(
nwk=device.nwk,
ieee=device.ieee,
extended_timeout=True,
async with self._req_lock:
if packet.dst.addr_mode == zigpy.types.AddrMode.NWK:
if packet.extended_timeout and device is not None:
await self._ezsp.set_extended_timeout(
nwk=device.nwk,
ieee=device.ieee,
extended_timeout=True,
)

if packet.source_route is not None:
if (
FirmwareFeatures.MANUAL_SOURCE_ROUTE
in self._ezsp._xncp_features
and self.config[CONF_BELLOWS_CONFIG][
CONF_MANUAL_SOURCE_ROUTING
]
):
await self._ezsp.xncp_set_manual_source_route(
nwk=packet.dst.address,
relays=packet.source_route,
)
else:
await self._ezsp.set_source_route(
nwk=packet.dst.address,
relays=packet.source_route,
)

if packet.source_route is not None:
if (
FirmwareFeatures.MANUAL_SOURCE_ROUTE
in self._ezsp._xncp_features
and self.config[CONF_BELLOWS_CONFIG][
CONF_MANUAL_SOURCE_ROUTING
]
):
await self._ezsp.xncp_set_manual_source_route(
nwk=packet.dst.address,
relays=packet.source_route,
)
else:
await self._ezsp.set_source_route(
nwk=packet.dst.address,
relays=packet.source_route,
)

status, _ = await self._ezsp.send_unicast(
nwk=packet.dst.address,
aps_frame=aps_frame,
message_tag=message_tag,
data=packet.data.serialize(),
)
elif packet.dst.addr_mode == zigpy.types.AddrMode.Group:
status, _ = await self._ezsp.send_multicast(
aps_frame=aps_frame,
radius=packet.radius,
non_member_radius=packet.non_member_radius,
message_tag=message_tag,
data=packet.data.serialize(),
status, _ = await self._ezsp.send_unicast(
nwk=packet.dst.address,
aps_frame=aps_frame,
message_tag=message_tag,
data=packet.data.serialize(),
)

if packet.extended_timeout:
route_status_handler_future = (
asyncio.get_running_loop().create_future()
)
elif packet.dst.addr_mode == zigpy.types.AddrMode.Broadcast:
status, _ = await self._ezsp.send_broadcast(
address=packet.dst.address,
aps_frame=aps_frame,
radius=packet.radius,
message_tag=message_tag,
aps_sequence=packet.tsn,
data=packet.data.serialize(),
self._request_status_handlers[packet.dst.address].append(
route_status_handler_future
)
elif packet.dst.addr_mode == zigpy.types.AddrMode.Group:
status, _ = await self._ezsp.send_multicast(
aps_frame=aps_frame,
radius=packet.radius,
non_member_radius=packet.non_member_radius,
message_tag=message_tag,
data=packet.data.serialize(),
)
elif packet.dst.addr_mode == zigpy.types.AddrMode.Broadcast:
status, _ = await self._ezsp.send_broadcast(
address=packet.dst.address,
aps_frame=aps_frame,
radius=packet.radius,
message_tag=message_tag,
aps_sequence=packet.tsn,
data=packet.data.serialize(),
)

if status == t.sl_Status.OK:
break
elif status not in (
t.sl_Status.ZIGBEE_MAX_MESSAGE_LIMIT_REACHED,
t.sl_Status.TRANSMIT_BUSY,
t.sl_Status.ALLOCATION_FAILED,
):
try:
if status != t.sl_Status.OK:
raise zigpy.exceptions.DeliveryError(
f"Failed to enqueue message: {status!r}", status
)
else:
if attempt < len(RETRY_DELAYS):
LOGGER.debug(
"Request %s failed to enqueue, retrying in %ss: %s",
pending_tag,
retry_delay,
status,
)
await asyncio.sleep(retry_delay)
else:
raise zigpy.exceptions.DeliveryError(
(
f"Failed to enqueue message after {len(RETRY_DELAYS)}"
f" attempts: {status!r}"
),
status,
)

# Only throw a delivery exception for packets sent with NWK addressing.
# https://github.com/home-assistant/core/issues/79832
# Broadcasts/multicasts don't have ACKs or confirmations either.
if packet.dst.addr_mode != zigpy.types.AddrMode.NWK:
return
# Only throw a delivery exception for packets sent with NWK addressing.
# https://github.com/home-assistant/core/issues/79832
# Broadcasts/multicasts don't have ACKs or confirmations either.
if packet.dst.addr_mode != zigpy.types.AddrMode.NWK:
return

# Wait for `messageSentHandler` message
async with asyncio_timeout(APS_ACK_TIMEOUT):
send_status, _ = await req.result
# Wait for `messageSentHandler` message
async with asyncio_timeout(APS_ACK_TIMEOUT):
send_status, _ = await req.result

if t.sl_Status.from_ember_status(send_status) != t.sl_Status.OK:
raise zigpy.exceptions.DeliveryError(
f"Failed to deliver message: {send_status!r}", send_status
)
if t.sl_Status.from_ember_status(send_status) != t.sl_Status.OK:
raise zigpy.exceptions.DeliveryError(

Check warning on line 931 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L931

Added line #L931 was not covered by tests
f"Failed to deliver message: {send_status!r}", send_status
)

# Only wait for routing status notifications for messages sent
# indirectly
if not packet.extended_timeout:
return

try:
async with asyncio_timeout(ROUTE_STATUS_TIMEOUT_BATTERY):
route_status = await route_status_handler_future
except asyncio.TimeoutError:
route_status = None

Check warning on line 944 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L943-L944

Added lines #L943 - L944 were not covered by tests

if route_status is not None:
raise zigpy.exceptions.DeliveryError(

Check warning on line 947 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L947

Added line #L947 was not covered by tests
f"Received a routing error: {route_status!r}", route_status
)
finally:
if route_status_handler_future is not None:
with contextlib.suppress(ValueError):
self._request_status_handlers[packet.dst.address].remove(
route_status_handler_future
)

async def permit(self, time_s: int = 60, node: t.EmberNodeId = None) -> None:
"""Permit joining."""
Expand Down Expand Up @@ -1055,5 +1071,17 @@
)
self.handle_relays(nwk=nwk, relays=relays)

handlers = self._request_status_handlers[nwk]
if not handlers:
return

handlers.popleft().set_result(None)

def handle_route_error(self, status: t.sl_Status, nwk: t.EmberNodeId) -> None:
LOGGER.debug("Processing route error: status=%s, nwk=%s", status, nwk)

handlers = self._request_status_handlers[nwk]
if not handlers:
return

handlers.popleft().set_result(status)

Check warning on line 1087 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L1087

Added line #L1087 was not covered by tests
63 changes: 20 additions & 43 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,7 @@ async def _test_send_packet_unicast(
packet,
*,
statuses=(bellows.types.sl_Status.OK,),
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY,
):
def send_unicast(*args, **kwargs):
nonlocal statuses
Expand Down Expand Up @@ -841,10 +838,7 @@ async def test_send_packet_unicast_source_route(make_app, packet):
await _test_send_packet_unicast(
app,
packet,
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY,
)

app._ezsp._protocol.set_source_route.assert_called_once_with(
Expand All @@ -871,10 +865,7 @@ async def test_send_packet_unicast_manual_source_route(make_app, packet):
await _test_send_packet_unicast(
app,
packet,
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY,
)

app._ezsp.xncp_set_manual_source_route.assert_called_once_with(
Expand All @@ -886,6 +877,19 @@ async def test_send_packet_unicast_manual_source_route(make_app, packet):
async def test_send_packet_unicast_extended_timeout(app, ieee, packet):
app.add_device(nwk=packet.dst.address, ieee=ieee)

asyncio.get_running_loop().call_later(
0.1,
app.ezsp_callback_handler,
"incomingRouteRecordHandler",
{
"source": packet.dst.address,
"sourceEui": ieee,
"lastHopLqi": 123,
"lastHopRssi": -60,
"relayList": [0x1234],
}.values(),
)

await _test_send_packet_unicast(
app,
packet.replace(extended_timeout=True),
Expand All @@ -896,37 +900,19 @@ async def test_send_packet_unicast_extended_timeout(app, ieee, packet):
]


@patch("bellows.zigbee.application.RETRY_DELAYS", [0.01, 0.01, 0.01])
async def test_send_packet_unicast_retries_success(app, packet):
await _test_send_packet_unicast(
app,
packet,
statuses=(
bellows.types.sl_Status.ALLOCATION_FAILED,
bellows.types.sl_Status.ALLOCATION_FAILED,
bellows.types.sl_Status.OK,
),
)


async def test_send_packet_unicast_unexpected_failure(app, packet):
with pytest.raises(zigpy.exceptions.DeliveryError):
await _test_send_packet_unicast(
app, packet, statuses=(t.EmberStatus.ERR_FATAL,)
)


@patch("bellows.zigbee.application.RETRY_DELAYS", [0.01, 0.01, 0.01])
async def test_send_packet_unicast_retries_failure(app, packet):
with pytest.raises(zigpy.exceptions.DeliveryError):
await _test_send_packet_unicast(
app,
packet,
statuses=(
bellows.types.sl_Status.ALLOCATION_FAILED,
bellows.types.sl_Status.ALLOCATION_FAILED,
bellows.types.sl_Status.ALLOCATION_FAILED,
),
statuses=(bellows.types.sl_Status.ALLOCATION_FAILED,) * 3,
)


Expand Down Expand Up @@ -1023,10 +1009,7 @@ async def test_send_packet_broadcast(app, packet):
clusterId=packet.cluster_id,
sourceEndpoint=packet.src_ep,
destinationEndpoint=packet.dst_ep,
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY,
groupId=0x0000,
sequence=packet.tsn,
),
Expand Down Expand Up @@ -1074,10 +1057,7 @@ async def test_send_packet_broadcast_ignored_delivery_failure(app, packet):
clusterId=packet.cluster_id,
sourceEndpoint=packet.src_ep,
destinationEndpoint=packet.dst_ep,
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY,
groupId=0x0000,
sequence=packet.tsn,
),
Expand Down Expand Up @@ -1127,10 +1107,7 @@ async def test_send_packet_multicast(app, packet):
clusterId=packet.cluster_id,
sourceEndpoint=packet.src_ep,
destinationEndpoint=packet.dst_ep,
options=(
t.EmberApsOption.APS_OPTION_RETRY
| t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
),
options=t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY,
groupId=0x1234,
sequence=packet.tsn,
),
Expand Down
Loading