Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ZGP handling in ZNP #234

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions zigpy_znp/commands/zdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import zigpy_znp.types as t


class SecurityEntry(t.FixedList, item_type=t.uint8_t, length=5):
pass

Expand Down Expand Up @@ -797,10 +796,10 @@ class ZDO(t.CommandsBase, subsystem=t.Subsystem.ZDO):
0x49,
req_schema=(
t.Param("Endpoint", t.uint8_t, "Endpoint to look for"),
# this parameter does not make sense
t.Param("Groups", t.uint16_t, "List to hold group IDs"),
),
rsp_schema=(t.Param("Groups", GroupIdList, "List of Group IDs"),),
rsp_schema=(
t.Param("Groups", GroupIdList, "List of Group IDs"),
),
)

# handle the ZDO extension find group message
Expand All @@ -811,7 +810,11 @@ class ZDO(t.CommandsBase, subsystem=t.Subsystem.ZDO):
t.Param("Endpoint", t.uint8_t, "Endpoint to look for"),
t.Param("GroupId", t.GroupId, "ID to look for group"),
),
rsp_schema=(t.Param("Group", t.Bytes, "Group information"),),
rsp_schema=(
t.Param("Status", t.Status, "Status is either Success (0) or Failure (1)"),
t.Param("GroupId", t.GroupId, "Found Group ID"),
t.Param("GroupName", t.Bytes, "Group name"),
),
)

# handle the ZDO extension add group message
Expand Down
5 changes: 3 additions & 2 deletions zigpy_znp/commands/zgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ZGP(t.CommandsBase, subsystem=t.Subsystem.ZGP):
# This message provides a mechanism for dGP stub to request security data from the
# Green Power EndPoint in the host processor
SecReq = t.CommandDef(
t.CommandType.AREQ,
t.CommandType.SREQ,
0x03,
req_schema=(
t.Param(
Expand Down Expand Up @@ -127,14 +127,15 @@ class ZGP(t.CommandsBase, subsystem=t.Subsystem.ZGP):
"Handle", t.uint8_t, "dGP stub handle to match req to confirmation"
),
),
rsp_schema=t.STATUS_SCHEMA,
)

# This message provides a mechanism for identifying and conveying a received
# GPDF to the Green Power EndPoint in the host processor
DataInd = t.CommandDef(
t.CommandType.AREQ,
0x04,
req_schema=(
rsp_schema=(
t.Param("Status", t.uint8_t, "The status code as returned by the dGP stub"),
t.Param(
"RSSI",
Expand Down
11 changes: 11 additions & 0 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ def _bind_callbacks(self) -> None:
self.on_intentionally_unhandled_message,
)

self._znp.callback_for_response(
c.ZGP.DataInd.Callback(partial=True),
self.on_zgp_data_ind
)

def on_zgp_data_ind(self, msg: c.ZGP.DataInd.Callback) -> None:
pass

def on_intentionally_unhandled_message(self, msg: t.CommandBase) -> None:
"""
Some commands are unhandled but frequently sent by devices on the network. To
Expand Down Expand Up @@ -691,6 +699,9 @@ def _find_endpoint(self, dst_ep: int, profile: int, cluster: int) -> int:

if dst_ep == ZDO_ENDPOINT:
return ZDO_ENDPOINT

if profile == zigpy.profiles.zgp.PROFILE_ID:
return zigpy.profiles.zgp.GREENPOWER_ENDPOINT_ID

# Newer Z-Stack releases ignore profiles and will work properly with endpoint 1
if (
Expand Down
36 changes: 35 additions & 1 deletion zigpy_znp/zigbee/device.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,48 @@
from __future__ import annotations

import logging
from typing import Any, Coroutine
from zigpy.zcl.foundation import Status as ZCLStatus, Status

import zigpy.zdo
import zigpy.endpoint
import zigpy.device
import zigpy.application
from zigpy_znp.api import ZNP
import zigpy_znp.commands as c
import zigpy_znp.types as t

LOGGER = logging.getLogger(__name__)

class ZNPEndpoint(zigpy.endpoint.Endpoint):
async def add_to_group(self, grp_id: int, name: str | None = None) -> ZCLStatus:
znp: ZNP = self.device.application._znp
if name is None:
name = ""

result = await znp.request(c.ZDO.ExtFindGroup.Req(Endpoint=self.endpoint_id, GroupId=grp_id))
if result.Status == t.Status.FAILURE:
result = await znp.request(c.ZDO.ExtAddGroup.Req(Endpoint=self.endpoint_id, GroupId=grp_id, GroupName=t.CharacterString(name)))
if result.Status == t.Status.FAILURE:
return ZCLStatus.FAILURE
group = self.device.application.groups.add_group(grp_id, name)
group.add_member(self)
return ZCLStatus.SUCCESS

async def remove_from_group(self, grp_id: int) -> ZCLStatus:
znp: ZNP = self.device.application._znp
result = await znp.request(c.ZDO.ExtRemoveGroup.Req(Endpoint=self.endpoint_id, GroupId=grp_id))
if result.Status == t.Status.FAILURE:
return ZCLStatus.FAILURE
if grp_id in self.device.application.groups:
self.device.application.groups[grp_id].remove_member(self)
return ZCLStatus.SUCCESS


class ZNPCoordinator(zigpy.device.Device):
"""
Coordinator zigpy device that keeps track of our endpoints and clusters.
"""

@property
def manufacturer(self):
return "Texas Instruments"
Expand All @@ -22,6 +51,11 @@ def manufacturer(self):
def model(self):
return "Coordinator"

def add_endpoint(self, endpoint_id) -> zigpy.endpoint.Endpoint:
ep = ZNPEndpoint(self, endpoint_id)
self.endpoints[endpoint_id] = ep
return ep

def request(
self,
profile,
Expand Down
Loading