Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read out the supported and unsupported attribute of a device at configure #101

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
21 changes: 20 additions & 1 deletion tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
_LOGGER = logging.getLogger(__name__)


def patch_cluster(cluster: zigpy.zcl.Cluster) -> None:
def patch_cluster(
cluster: zigpy.zcl.Cluster, unsupported_attr: set[str] | None = None
) -> None:
"""Patch a cluster for testing."""
cluster.PLUGGED_ATTR_READS = {}

if unsupported_attr is None:
unsupported_attr = set()

async def _read_attribute_raw(attributes: Any, *args: Any, **kwargs: Any) -> Any:
result = []
for attr_id in attributes:
Expand All @@ -44,6 +49,19 @@ async def _read_attribute_raw(attributes: Any, *args: Any, **kwargs: Any) -> Any
result.append(zcl_f.ReadAttributeRecord(attr_id, zcl_f.Status.FAILURE))
return (result,)

async def _discover_attributes(*args: Any, **kwargs: Any) -> Any:
schema = zcl_f.GENERAL_COMMANDS[
zcl_f.GeneralCommand.Discover_Attributes_rsp
].schema
records = [
zcl_f.DiscoverAttributesResponseRecord.from_dict(
{"attrid": attr.id, "datatype": 0}
)
for attr in cluster.attributes.values()
if attr.name not in unsupported_attr
]
return schema(discovery_complete=t.Bool.true, attribute_info=records)

cluster.bind = AsyncMock(return_value=[0])
cluster.configure_reporting = AsyncMock(
return_value=[
Expand All @@ -61,6 +79,7 @@ async def _read_attribute_raw(attributes: Any, *args: Any, **kwargs: Any) -> Any
cluster._write_attributes = AsyncMock(
return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]]
)
cluster.discover_attributes = AsyncMock(side_effect=_discover_attributes)
if cluster.cluster_id == 4:
cluster.add = AsyncMock(return_value=[0])
if cluster.cluster_id == 0x1000:
Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def _mock_dev(
patch_cluster: bool = True,
quirk: Optional[Callable] = None,
attributes: dict[int, dict[str, dict[str, Any]]] = None,
unsupported_attr: dict[int, set[str]] | None = None,
) -> zigpy.device.Device:
"""Make a fake device using the specified cluster classes."""
device = zigpy.device.Device(
Expand Down Expand Up @@ -464,12 +465,16 @@ def _mock_dev(
device = get_device(device)

if patch_cluster:
if unsupported_attr is None:
unsupported_attr = {}
for endpoint in (ep for epid, ep in device.endpoints.items() if epid):
endpoint.request = AsyncMock(return_value=[0])
for cluster in itertools.chain(
endpoint.in_clusters.values(), endpoint.out_clusters.values()
):
common.patch_cluster(cluster)
common.patch_cluster(
cluster, unsupported_attr.get(endpoint.endpoint_id, set())
)

if attributes is not None:
for ep_id, clusters in attributes.items():
Expand Down
24 changes: 16 additions & 8 deletions tests/test_discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
from typing import Any, Final
from unittest import mock
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock

import pytest
from zhaquirks.ikea import PowerConfig1CRCluster, ScenesCluster
Expand Down Expand Up @@ -116,10 +116,6 @@ async def _mock(
return _mock


@patch(
"zigpy.zcl.clusters.general.Identify.request",
new=AsyncMock(return_value=[mock.sentinel.data, zcl_f.Status.SUCCESS]),
)
@pytest.mark.parametrize("device", DEVICES)
async def test_devices(
device,
Expand All @@ -140,7 +136,9 @@ async def test_devices(

cluster_identify = _get_identify_cluster(zigpy_device)
if cluster_identify:
cluster_identify.request.reset_mock()
cluster_identify.request = AsyncMock(
return_value=[mock.sentinel.data, zcl_f.Status.SUCCESS]
)

zha_dev: Device = await device_joined(zigpy_device)
await zha_gateway.async_block_till_done()
Expand All @@ -151,14 +149,24 @@ async def test_devices(
False,
cluster_identify.commands_by_name["trigger_effect"].id,
cluster_identify.commands_by_name["trigger_effect"].schema,
manufacturer=None,
expect_reply=True,
tsn=None,
effect_id=zigpy.zcl.clusters.general.Identify.EffectIdentifier.Okay,
effect_variant=(
zigpy.zcl.clusters.general.Identify.EffectVariant.Default
),
expect_reply=True,
),
mock.call(
True,
zcl_f.GeneralCommand.Discover_Attributes,
zcl_f.GENERAL_COMMANDS[zcl_f.GeneralCommand.Discover_Attributes].schema,
manufacturer=None,
expect_reply=True,
tsn=None,
)
start_attribute_id=0,
max_attribute_ids=255,
),
]

event_cluster_handlers = {
Expand Down
6 changes: 2 additions & 4 deletions tests/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,12 @@ async def test_unsupported_attributes_sensor(
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH,
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
}
}
},
unsupported_attr={1: unsupported_attributes},
)
cluster = zigpy_device.endpoints[1].in_clusters[cluster_id]
if cluster_id == smartenergy.Metering.cluster_id:
# this one is mains powered
zigpy_device.node_desc.mac_capability_flags |= 0b_0000_0100
for attr in unsupported_attributes:
cluster.add_unsupported_attribute(attr)

zha_device = await device_joined(zigpy_device)

Expand Down
50 changes: 50 additions & 0 deletions zha/zigbee/cluster_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypedDict

import zigpy.exceptions
import zigpy.types
import zigpy.util
import zigpy.zcl
from zigpy.zcl.foundation import (
GENERAL_COMMANDS,
CommandSchema,
ConfigureReportingResponseRecord,
DiscoverAttributesResponseRecord,
GeneralCommand,
Status,
ZCLAttributeDef,
)
Expand Down Expand Up @@ -441,6 +445,7 @@ async def async_configure(self) -> None:
if ch_specific_cfg:
self.debug("Performing cluster handler specific configuration")
await ch_specific_cfg()

self.debug("finished cluster handler configuration")
else:
self.debug("skipping cluster handler configuration")
Expand All @@ -458,6 +463,10 @@ async def async_initialize(self, from_cache: bool) -> None:
uncached = [a for a, cached in self.ZCL_INIT_ATTRS.items() if not cached]
uncached.extend([cfg["attr"] for cfg in self.REPORT_CONFIG])

if not from_cache:
self.debug("discovering unsupported attributes")
await self.discover_unsupported_attributes()

if cached:
self.debug("initializing cached cluster handler attributes: %s", cached)
await self._get_attributes(
Expand Down Expand Up @@ -624,6 +633,47 @@ async def write_attributes_safe(
f"Failed to write attribute {name}={value}: {record.status}",
)

async def _discover_attributes_all(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wan't sure if this should go here in ZHA or in ZIGPY. The cache of unsupported attributes does exist in ZIGPY.

self,
) -> list[DiscoverAttributesResponseRecord] | None:
discovery_complete = zigpy.types.Bool.false
start_attribute_id = 0
attribute_info = []
cluster = self.cluster
while discovery_complete != zigpy.types.Bool.true:
rsp = await cluster.discover_attributes(
start_attribute_id=start_attribute_id, max_attribute_ids=0xFF
)
if not isinstance(
rsp, GENERAL_COMMANDS[GeneralCommand.Discover_Attributes_rsp].schema
):
self.debug(
"Ignoring attribute discovery due to unexpected default response: %r",
rsp,
)
return None

attribute_info.extend(rsp.attribute_info)
discovery_complete = rsp.discovery_complete
start_attribute_id = (
max((info.attrid for info in rsp.attribute_info), default=0) + 1
)
return attribute_info

async def discover_unsupported_attributes(self):
"""Discover the list of unsupported attributes from the device."""
attribute_info = await self._discover_attributes_all()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also i'm not fully sure what type of errors devices might respond with here. We may want to gracefully handle some things. For example, the GeneralCommand.Default_Response response is sent by some quirks here which seem utterly wrong.

if attribute_info is None:
return
attr_ids = {info.attrid for info in attribute_info}

cluster = self.cluster
for attr_id in cluster.attributes:
if attr_id in attr_ids:
cluster.remove_unsupported_attribute(attr_id)
else:
cluster.add_unsupported_attribute(attr_id)

def log(self, level, msg, *args, **kwargs) -> None:
"""Log a message."""
msg = f"[%s:%s]: {msg}"
Expand Down
Loading