Skip to content

Commit

Permalink
Merge pull request #43 from Adminiuga/fixes/zdo-dev-annce
Browse files Browse the repository at this point in the history
Fix ieee address deserialization on ZDO Device_annce frame.
  • Loading branch information
Adminiuga authored Jun 12, 2019
2 parents c350485 + 5a2b8d0 commit 9cd3843
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 6 deletions.
61 changes: 57 additions & 4 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import pytest

from zigpy.exceptions import DeliveryError
import zigpy.device
from zigpy.types import EUI64
import zigpy.zdo.types as zdo_t
from zigpy_deconz.api import Deconz
from zigpy_deconz.zigbee.application import ControllerApplication
from zigpy_deconz.zigbee import application
Expand All @@ -17,18 +19,28 @@ def app(database_file=None):


@pytest.fixture
def addr_ieee():
def ieee():
return EUI64.deserialize(b'\x00\x01\x02\x03\x04\x05\x06\x07')[0]


@pytest.fixture
def nwk():
return t.uint16_t(0x0100)


@pytest.fixture
def addr_ieee(ieee):
addr = t.DeconzAddress()
addr.address_mode = t.ADDRESS_MODE.IEEE
addr.address = b'\x00\x01\x02\x03\x04\x05\x06\x07'
addr.address = ieee
return addr


@pytest.fixture
def addr_nwk():
def addr_nwk(nwk):
addr = t.DeconzAddress()
addr.address_mode = t.ADDRESS_MODE.NWK
addr.address = b'\x00\x01'
addr.address = nwk
return addr


Expand Down Expand Up @@ -245,3 +257,44 @@ async def test_shutdown(app):
app._api.close = mock.MagicMock()
await app.shutdown()
assert app._api.close.call_count == 1


def test_rx_device_annce(app, addr_ieee, addr_nwk):
dst_ep = 0
cluster_id = zdo_t.ZDOCmd.Device_annce
device = mock.MagicMock()
device.status = zigpy.device.Status.NEW
app.get_device = mock.MagicMock(return_value=device)
app.deserialize = mock.MagicMock(return_value=(mock.sentinel.tsn,
mock.sentinel.cmd_id,
False,
mock.sentinel.args, ))
app.handle_join = mock.MagicMock()
app._handle_reply = mock.MagicMock()
app.handle_message = mock.MagicMock()

data = t.uint8_t(0xaa).serialize()
data += addr_nwk.address.serialize()
data += addr_ieee.address.serialize()
data += t.uint8_t(0x8e).serialize()

app.handle_rx(
addr_nwk,
mock.sentinel.src_ep,
dst_ep,
mock.sentinel.profile_id,
cluster_id,
data,
mock.sentinel.lqi,
mock.sentinel.rssi,
)

assert app.deserialize.call_count == 1
assert app.deserialize.call_args[0][2] == cluster_id
assert app.deserialize.call_args[0][3] == data
assert app._handle_reply.call_count == 0
assert app.handle_message.call_count == 1
assert app.handle_join.call_count == 1
assert app.handle_join.call_args[0][0] == addr_nwk.address
assert app.handle_join.call_args[0][1] == addr_ieee.address
assert app.handle_join.call_args[0][2] == 0
4 changes: 2 additions & 2 deletions zigpy_deconz/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ async def permit_ncp(self, time_s=60):
def handle_rx(self, src_addr, src_ep, dst_ep, profile_id, cluster_id, data, lqi, rssi):
# intercept ZDO device announce frames
if dst_ep == 0 and cluster_id == 0x13:
nwk, _ = t.uint16_t.deserialize(data[1:])
ieee = zigpy.types.EUI64(map(t.uint8_t, data[7::-1]))
nwk, rest = t.uint16_t.deserialize(data[1:])
ieee, _ = zigpy.types.EUI64.deserialize(rest)
LOGGER.info("New device joined: 0x%04x, %s", nwk, ieee)
self.handle_join(nwk, ieee, 0)

Expand Down

0 comments on commit 9cd3843

Please sign in to comment.