diff --git a/tests/test_niko.py b/tests/test_niko.py index 414bafe62b..7f18919f44 100644 --- a/tests/test_niko.py +++ b/tests/test_niko.py @@ -6,6 +6,7 @@ from zigpy import types as t from zigpy.zcl import foundation +from tests.common import wait_for_zigpy_tasks import zhaquirks zhaquirks.setup() @@ -43,6 +44,21 @@ def test_config_cluster(self, zigpy_device_from_v2_quirk, switch): 0x0107: foundation.ZCLAttributeAccess.from_str("rw"), } + @mock.patch("zigpy.zcl.Cluster.bind", mock.AsyncMock()) + async def test_config_cluster_preloading( + self, zigpy_device_from_v2_quirk, switch + ): + """Test whether the config cluster pre-loads all attributes.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + cluster = device.endpoints[1].niko_config + + with mock.patch.object( + cluster, "read_attributes", mock.AsyncMock() + ) as read_attributes: + await cluster.bind() + read_attributes.assert_called_with(cluster.attributes) + await wait_for_zigpy_tasks() + def test_state_cluster(self, zigpy_device_from_v2_quirk, switch): """Test the state cluster.""" device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) @@ -64,6 +80,14 @@ def test_buttons_cluster(self, zigpy_device_from_v2_quirk, switch): 0x0002: foundation.ZCLAttributeAccess.from_str("rp"), 0x0003: foundation.ZCLAttributeAccess.from_str("rp"), 0x0004: foundation.ZCLAttributeAccess.from_str("rp"), + # Status LED on/off + 0x0011: foundation.ZCLAttributeAccess.from_str("rwp"), + 0x0013: foundation.ZCLAttributeAccess.from_str("rwp"), + # Status LED sync + 0x0021: foundation.ZCLAttributeAccess.from_str("rw"), + 0x0023: foundation.ZCLAttributeAccess.from_str("rw"), + # Status LED alert color + 0x0100: foundation.ZCLAttributeAccess.from_str("rw"), } @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) @@ -353,3 +377,217 @@ def test_events(self, zigpy_device_from_v2_quirk, switch, case): "press_type": press_type, }, ) + + @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) + class TestLedState: + """Test reading and writing status LED state.""" + + @pytest.mark.parametrize( + "case", + [ + [(0b00, t.Bool.false, t.Bool.false)], + [(0b01, t.Bool.true, t.Bool.false)], + [(0b10, t.Bool.false, t.Bool.true)], + [(0b11, t.Bool.true, t.Bool.true)], + ], + ) + async def test_read(self, zigpy_device_from_v2_quirk, switch, case): + """Test whether led state changes cause attribute changes in the buttons cluster.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = (foundation.Status.SUCCESS, "done") + + for state, led1, led3 in case: + config_cluster.update_attribute(0x0105, state) + + attrs, _ = await buttons_cluster.read_attributes([0x0011, 0x0013]) + assert attrs[0x0011] == led1 + assert attrs[0x0013] == led3 + + @pytest.mark.parametrize( + "case", + [ + # LED 1 + [ + (0x0011, t.Bool.false, 0b00), + (0x0011, t.Bool.true, 0b01), + (0x0011, t.Bool.true, 0b01), + (0x0011, t.Bool.false, 0b00), + ], + # LED 3 + [ + (0x0013, t.Bool.false, 0b00), + (0x0013, t.Bool.true, 0b10), + (0x0013, t.Bool.true, 0b10), + (0x0013, t.Bool.false, 0b00), + ], + # Mixed + [ + (0x0011, t.Bool.false, 0b00), + (0x0013, t.Bool.false, 0b00), + (0x0011, t.Bool.true, 0b01), + (0x0013, t.Bool.true, 0b11), + ], + ], + ) + async def test_write(self, zigpy_device_from_v2_quirk, switch, case): + """Test writes of LED state.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = ([], foundation.Status.SUCCESS) + for attrid, value, expected in case: + await buttons_cluster.write_attributes({attrid: value}) + await wait_for_zigpy_tasks() + + attr = config_cluster.get(0x0105) + assert (attr.value if attr else 0) == expected + + @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) + class TestLedSync: + """Test reading and writing status LED synchronization state.""" + + @pytest.mark.parametrize( + "case", + [ + [(0x00, 0x0, 0x0)], + [(0x01, 0x1, 0x0)], + [(0x02, 0x2, 0x0)], + [(0x10, 0x0, 0x1)], + [(0x20, 0x0, 0x2)], + [(0x11, 0x1, 0x1)], + [(0x22, 0x2, 0x2)], + ], + ) + async def test_read(self, zigpy_device_from_v2_quirk, switch, case): + """Test whether led sync changes cause attribute changes in the buttons cluster.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = (foundation.Status.SUCCESS, "done") + + for state, led1_sync, led3_sync in case: + config_cluster.update_attribute(0x0107, state) + + attrs, _ = await buttons_cluster.read_attributes([0x0021, 0x0023]) + assert attrs[0x0021] == led1_sync + assert attrs[0x0023] == led3_sync + + @pytest.mark.parametrize( + "case", + [ + # LED 1 + [ + (0x0021, 0x0, 0x00), + (0x0021, 0x1, 0x01), + (0x0021, 0x2, 0x02), + ], + # LED 3 + [ + (0x0023, 0x0, 0x00), + (0x0023, 0x1, 0x10), + (0x0023, 0x2, 0x20), + ], + # Mixed + [ + (0x0021, 0x0, 0x00), + (0x0023, 0x0, 0x00), + (0x0021, 0x1, 0x01), + (0x0023, 0x1, 0x11), + (0x0021, 0x2, 0x12), + (0x0023, 0x2, 0x22), + ], + ], + ) + async def test_write(self, zigpy_device_from_v2_quirk, switch, case): + """Test writes of LED sync.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = ([], foundation.Status.SUCCESS) + for attrid, value, expected in case: + await buttons_cluster.write_attributes({attrid: value}) + await wait_for_zigpy_tasks() + + attr = config_cluster.get(0x0107) + assert (attr.value if attr else 0) == expected + + @pytest.mark.parametrize("switch", [SWITCH_SINGLE, SWITCH_DOUBLE]) + class TestLedsAlert: + """Test reading and writing status LED alert state.""" + + @pytest.mark.parametrize( + "case", + [ + (0x000000,), + (0x0000FF,), + (0x00FF00,), + (0xFF0000,), + (0xFFFFFF,), + ], + ) + async def test_read(self, zigpy_device_from_v2_quirk, switch, case): + """Test whether LED alert changes cause attribute changes in the buttons cluster.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = (foundation.Status.SUCCESS, "done") + + for led_alert in case: + config_cluster.update_attribute(0x0100, led_alert) + + attrs, _ = await buttons_cluster.read_attributes([0x0100]) + assert attrs[0x0100] == led_alert + + @pytest.mark.parametrize( + "case", + [ + (0x000000,), + (0x0000FF,), + (0x00FF00,), + (0xFF0000,), + (0xFFFFFF,), + ], + ) + async def test_write(self, zigpy_device_from_v2_quirk, switch, case): + """Test writes of LED alert color.""" + device = zigpy_device_from_v2_quirk(*switch[0], **switch[1]) + config_cluster = device.endpoints[1].niko_config + buttons_cluster = device.endpoints[1].buttons + + with mock.patch.object( + config_cluster.endpoint, "request", mock.AsyncMock() + ) as request: + request.return_value = ([], foundation.Status.SUCCESS) + for value in case: + await buttons_cluster.write_attributes({0x0100: value}) + await wait_for_zigpy_tasks() + + attrs, _ = await config_cluster.read_attributes( + [0x0100], only_cache=True + ) + assert attrs[0x0100] == value diff --git a/zhaquirks/niko/switch.py b/zhaquirks/niko/switch.py index af8204424f..8266ef05ad 100644 --- a/zhaquirks/niko/switch.py +++ b/zhaquirks/niko/switch.py @@ -44,6 +44,15 @@ def _update_attribute(self, attrid, value): attrid = self.attributes[attrid].name self.attribute_bus.listener_event(f"{attrid}_updated", value) + async def write_attributes(self, attributes, manufacturer=None, **kwargs): + """Write the attributes and informs listeners on the attributes bus.""" + result = await super().write_attributes(attributes, manufacturer, **kwargs) + for attrid, value in attributes.items(): + if isinstance(attrid, int): + attrid = self.attributes[attrid].name + self.attribute_bus.listener_event(f"{attrid}_written", value) + return result + class ButtonsDefaultAction(t.enum8): """Whether pressing buttons automatically triggers the corresponding switch.""" @@ -144,6 +153,59 @@ class AttributeDefs(BaseAttributeDefs): AttributeDefs.leds_functionality.id: LedsFunctionality.Enabled, } + async def bind(self): + """Bind cluster and pre-load attributes.""" + self.create_catching_task(self.read_attributes(self.attributes)) + return await super().bind() + + def led_1_on_written(self, value: bool): + """Process a write to the led_1_on attribute.""" + self.create_catching_task(self.write_led_on(0, value)) + + def led_3_on_written(self, value: bool): + """Process a write to the led_3_on attribute.""" + self.create_catching_task(self.write_led_on(1, value)) + + def led_1_switch_sync_written(self, value: LedSwitchSyncOptions): + """Process a write to the led_1_switch_sync attribute.""" + self.create_catching_task(self.write_led_switch_sync(0, value)) + + def led_3_switch_sync_written(self, value: LedSwitchSyncOptions): + """Process a write to the led_3_switch_sync attribute.""" + self.create_catching_task(self.write_led_switch_sync(1, value)) + + def alert_color_written(self, value: LedsAlertColors): + """Process a write to the alert_color attribute.""" + self.create_catching_task(self.write_leds_alert(value)) + + async def write_led_on(self, led: int, value: bool): + """Set the status LED to on or off using the leds_on attribute.""" + # Determine the previous state of the specific status LED + state = self.get(self.AttributeDefs.leds_on.id) or 0 + mask = 1 << led + previous = bool(state & mask) + + # If the status LED changed, update the leds_on attribute + if value != previous: + state = (state | mask) if value else state & ~mask + await self.write_attributes({self.AttributeDefs.leds_on.id: state}) + + async def write_led_switch_sync(self, led: int, value: LedSwitchSyncOptions): + """Set LED/switch synchronization using the leds_switch_sync attribute.""" + # Determine previous state of individual LED + state = self.get(self.AttributeDefs.leds_switch_sync.id) or 0 + shift = led << 2 + previous = state >> shift & 0xF + + # Update if the LED's state changed + if value != previous: + state = state & ~(0xF << shift) | (value << shift) + await self.write_attributes({self.AttributeDefs.leds_switch_sync.id: state}) + + async def write_leds_alert(self, value: LedsAlertColors): + """Write the leds_alert attribute.""" + await self.write_attributes({self.AttributeDefs.leds_alert.id: value}) + class ButtonStateReporting(t.bitmap8): """Report state changes for these buttons.""" @@ -247,6 +309,42 @@ class AttributeDefs(BaseAttributeDefs): is_manufacturer_specific=True, ) + # Status lights + led_1_on = ZCLAttributeDef( + id=0x0011, + type=t.Bool, + access="rwp", + is_manufacturer_specific=True, + ) + led_3_on = ZCLAttributeDef( + id=0x0013, + type=t.Bool, + access="rwp", + is_manufacturer_specific=True, + ) + + # Status light synchronization with switches + led_1_switch_sync = ZCLAttributeDef( + id=0x0021, + type=LedSwitchSyncOptions, + access="rw", + is_manufacturer_specific=True, + ) + led_3_switch_sync = ZCLAttributeDef( + id=0x0023, + type=LedSwitchSyncOptions, + access="rw", + is_manufacturer_specific=True, + ) + + # Status light alerts + alert_color = ZCLAttributeDef( + id=0x0100, + type=LedsAlertColors, + access="rw", + is_manufacturer_specific=True, + ) + _VALID_ATTRIBUTES = [attr.id for attr in AttributeDefs] PRESSED_ATTRIBUTES = [ @@ -255,6 +353,14 @@ class AttributeDefs(BaseAttributeDefs): AttributeDefs.button_3_pressed, AttributeDefs.button_4_pressed, ] + LED_ON_ATTRIBUTES = [ + AttributeDefs.led_1_on, + AttributeDefs.led_3_on, + ] + LED_SWITCH_SYNC_ATTRIBUTES = [ + AttributeDefs.led_1_switch_sync, + AttributeDefs.led_3_switch_sync, + ] def __init__(self, *args, **kwargs): """Initialize the cluster.""" @@ -278,6 +384,22 @@ def buttons_state_updated(self, state): {BUTTON: button, PRESS_TYPE: press}, ) + def leds_on_updated(self, value): + """Reflect leds_on in individual led_on_x attributes.""" + for i, attr in enumerate(self.LED_ON_ATTRIBUTES): + on = t.Bool(value >> i & 0b1) + self._update_attribute(attr.id, on) + + def leds_switch_sync_updated(self, value): + """Reflect leds_switch_sync in individual leds_switch_sync_x attributes.""" + for i, attr in enumerate(self.LED_SWITCH_SYNC_ATTRIBUTES): + sync = (value >> 4 * i) & 0xF + self._update_attribute(attr.id, sync) + + def leds_alert_updated(self, value: LedsAlert): + """Mirror leds_alert in the alert_color property.""" + self._update_attribute(self.AttributeDefs.alert_color.id, value) + class NikoSwitch(CustomDeviceV2): """Base class for Niko Connected Switches.""" @@ -353,6 +475,17 @@ def __init__(self, device_class): fallback_name=f"Button {b + 1}", ) + # Status LED entities + for b, key in enumerate(device_class.button_keys[::2]): + self.switch( + ButtonsCluster.LED_ON_ATTRIBUTES[b].name, + ButtonsCluster.cluster_id, + entity_type=EntityType.STANDARD, + translation_key=f"{key}_led_indicator", + fallback_name=f"Button {2 * b + 1} LED", + initially_disabled=True, + ) + # Configuration entities self.switch( NikoConfigCluster.AttributeDefs.buttons_default_action.name, @@ -362,16 +495,33 @@ def __init__(self, device_class): entity_type=EntityType.CONFIG, translation_key="switch_mode", fallback_name="Switch When Pressed", - initially_disabled=True, ) self.switch( NikoConfigCluster.AttributeDefs.leds_functionality.name, NikoConfigCluster.cluster_id, entity_type=EntityType.CONFIG, - translation_key="status_light", + translation_key="enabled_led_indicator", fallback_name="Enable LEDs", initially_disabled=True, ) + for b, key in enumerate(device_class.button_keys[::2]): + self.enum( + ButtonsCluster.LED_SWITCH_SYNC_ATTRIBUTES[b].name, + LedSwitchSyncOptions, + ButtonsCluster.cluster_id, + entity_type=EntityType.CONFIG, + translation_key=f"{key}_sync", + fallback_name=f"Button {b + 1} LED Switch Sync", + ) + self.enum( + ButtonsCluster.AttributeDefs.alert_color.name, + LedsAlertColors, + ButtonsCluster.cluster_id, + entity_type=EntityType.CONFIG, + translation_key="alert", + fallback_name="Alert", + initially_disabled=True, + ) (NikoQuirkBuilder(NikoSwitchSingle).add_to_registry())