Skip to content

Commit

Permalink
restructure, add entity APIs back and remove duplicate models
Browse files Browse the repository at this point in the history
  • Loading branch information
dmulcahey committed Oct 20, 2024
1 parent 4cb3032 commit c08566b
Show file tree
Hide file tree
Showing 64 changed files with 4,990 additions and 1,973 deletions.
64 changes: 64 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,67 @@ def create_mock_zigpy_device(
cluster._attr_cache[attr_id] = value

return device


def find_entity_id(
domain: str, zha_device: Device, qualifier: Optional[str] = None
) -> Optional[str]:
"""Find the entity id under the testing.
This is used to get the entity id in order to get the state from the state
machine so that we can test state changes.
"""
entities = find_entity_ids(domain, zha_device)
if not entities:
return None
if qualifier:
for entity_id in entities:
if qualifier in entity_id:
return entity_id
return None
else:
return entities[0]


def find_entity_ids(
domain: str, zha_device: Device, omit: Optional[list[str]] = None
) -> list[str]:
"""Find the entity ids under the testing.
This is used to get the entity id in order to get the state from the state
machine so that we can test state changes.
"""
head = f"{domain}.{str(zha_device.ieee)}"

entity_ids = [
f"{entity.PLATFORM}.{entity.unique_id}"
for entity in zha_device.platform_entities.values()
]

matches = []
res = []
for entity_id in entity_ids:
if entity_id.startswith(head):
matches.append(entity_id)

if omit:
for entity_id in matches:
skip = False
for o in omit:
if o in entity_id:
skip = True
break
if not skip:
res.append(entity_id)
else:
res = matches
return res


def async_find_group_entity_id(domain: str, group: Group) -> Optional[str]:
"""Find the group entity id under test."""
entity_id = f"{domain}_zha_group_0x{group.group_id:04x}"

if entity_id in group.group_entities:
return entity_id
return None
36 changes: 29 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,21 @@ async def zigpy_app_controller_fixture():

# Create a fake coordinator device
dev = app.add_device(nwk=app.state.node_info.nwk, ieee=app.state.node_info.ieee)
dev.node_desc = zdo_t.NodeDescriptor()
dev.node_desc = zdo_t.NodeDescriptor(
logical_type=zdo_t.LogicalType.Coordinator,
complex_descriptor_available=0,
user_descriptor_available=0,
reserved=0,
aps_flags=0,
frequency_band=zdo_t.NodeDescriptor.FrequencyBand.Freq2400MHz,
mac_capability_flags=zdo_t.NodeDescriptor.MACCapabilityFlags.AllocateAddress,
manufacturer_code=0x1234,
maximum_buffer_size=127,
maximum_incoming_transfer_size=100,
server_mask=10752,
maximum_outgoing_transfer_size=100,
descriptor_capability_field=zdo_t.NodeDescriptor.DescriptorCapability.NONE,
)
dev.node_desc.logical_type = zdo_t.LogicalType.Coordinator
dev.manufacturer = "Coordinator Manufacturer"
dev.model = "Coordinator Model"
Expand Down Expand Up @@ -312,16 +326,24 @@ async def __aexit__(
async def connected_client_and_server(
zha_data: ZHAData,
zigpy_app_controller: ControllerApplication,
caplog: pytest.LogCaptureFixture, # pylint: disable=unused-argument
) -> AsyncGenerator[tuple[Controller, WebSocketGateway], None]:
"""Return the connected client and server fixture."""

application_controller_patch = patch(
"bellows.zigbee.application.ControllerApplication.new",
return_value=zigpy_app_controller,
)

with application_controller_patch:
with (
patch(
"bellows.zigbee.application.ControllerApplication.new",
return_value=zigpy_app_controller,
),
patch(
"bellows.zigbee.application.ControllerApplication",
return_value=zigpy_app_controller,
),
):
ws_gateway = await WebSocketGateway.async_from_config(zha_data)
await ws_gateway.async_initialize()
await ws_gateway.async_block_till_done()
await ws_gateway.async_initialize_devices_and_entities()
async with (
ws_gateway as gateway,
Controller(f"ws://localhost:{zha_data.server_config.port}") as controller,
Expand Down

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions tests/test_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def test_gateway_raw_device_initialized(
signature={
"manufacturer": "FakeManufacturer",
"model": "FakeModel",
"node_desc": {
"node_descriptor": {
"logical_type": LogicalType.EndDevice,
"complex_descriptor_available": 0,
"user_descriptor_available": 0,
Expand All @@ -664,9 +664,9 @@ def test_gateway_raw_device_initialized(
},
"endpoints": {
1: {
"profile_id": 260,
"device_type": zha.DeviceType.ON_OFF_SWITCH,
"input_clusters": [0],
"profile_id": "0x0104",
"device_type": "0x0000",
"input_clusters": ["0x0000"],
"output_clusters": [],
}
},
Expand Down
4 changes: 2 additions & 2 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_ser_deser_zha_event():

assert zha_event.model_dump() == {
"message_type": "event",
"event_type": "zha_event",
"event_type": "device_event",
"event": "zha_event",
"device_ieee": "00:00:00:00:00:00:00:00",
"unique_id": "00:00:00:00:00:00:00:00",
Expand All @@ -35,7 +35,7 @@ def test_ser_deser_zha_event():

assert (
zha_event.model_dump_json()
== '{"message_type":"event","event_type":"zha_event","event":"zha_event",'
== '{"message_type":"event","event_type":"device_event","event":"zha_event",'
'"device_ieee":"00:00:00:00:00:00:00:00","unique_id":"00:00:00:00:00:00:00:00","data":{"key":"value"}}'
)

Expand Down
1 change: 1 addition & 0 deletions tests/websocket/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Websocket tests modules."""
124 changes: 124 additions & 0 deletions tests/websocket/test_binary_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Test zhaws binary sensor."""

from collections.abc import Awaitable, Callable
from typing import Optional

import pytest
import zigpy.profiles.zha
from zigpy.zcl.clusters import general, measurement, security

from zha.application.discovery import Platform
from zha.application.platforms.model import BasePlatformEntity, BinarySensorEntity
from zha.websocket.client.controller import Controller
from zha.websocket.client.proxy import DeviceProxy
from zha.websocket.server.gateway import WebSocketGateway as Server

from ..common import (
SIG_EP_INPUT,
SIG_EP_OUTPUT,
SIG_EP_PROFILE,
SIG_EP_TYPE,
create_mock_zigpy_device,
join_zigpy_device,
send_attributes_report,
update_attribute_cache,
)


def find_entity(
device_proxy: DeviceProxy, platform: Platform
) -> Optional[BasePlatformEntity]:
"""Find an entity for the specified platform on the given device."""
for entity in device_proxy.device_model.entities.values():
if entity.platform == platform:
return entity
return None


DEVICE_IAS = {
1: {
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.IAS_ZONE,
SIG_EP_INPUT: [security.IasZone.cluster_id],
SIG_EP_OUTPUT: [],
}
}


DEVICE_OCCUPANCY = {
1: {
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.OCCUPANCY_SENSOR,
SIG_EP_INPUT: [measurement.OccupancySensing.cluster_id],
SIG_EP_OUTPUT: [],
}
}


async def async_test_binary_sensor_on_off(
server: Server, cluster: general.OnOff, entity: BinarySensorEntity
) -> None:
"""Test getting on and off messages for binary sensors."""
# binary sensor on
await send_attributes_report(server, cluster, {1: 0, 0: 1, 2: 2})
assert entity.state.state is True

# binary sensor off
await send_attributes_report(server, cluster, {1: 1, 0: 0, 2: 2})
assert entity.state.state is False


async def async_test_iaszone_on_off(
server: Server, cluster: security.IasZone, entity: BinarySensorEntity
) -> None:
"""Test getting on and off messages for iaszone binary sensors."""
# binary sensor on
cluster.listener_event("cluster_command", 1, 0, [1])
await server.async_block_till_done()
assert entity.state.state is True

# binary sensor off
cluster.listener_event("cluster_command", 1, 0, [0])
await server.async_block_till_done()
assert entity.state.state is False


@pytest.mark.parametrize(
"device, on_off_test, cluster_name, reporting",
[
(DEVICE_IAS, async_test_iaszone_on_off, "ias_zone", (0,)),
(DEVICE_OCCUPANCY, async_test_binary_sensor_on_off, "occupancy", (1,)),
],
)
async def test_binary_sensor(
connected_client_and_server: tuple[Controller, Server],
device: dict,
on_off_test: Callable[..., Awaitable[None]],
cluster_name: str,
reporting: tuple,
) -> None:
"""Test ZHA binary_sensor platform."""
controller, server = connected_client_and_server
zigpy_device = create_mock_zigpy_device(server, device)
zhaws_device = await join_zigpy_device(server, zigpy_device)

await server.async_block_till_done()

client_device: Optional[DeviceProxy] = controller.devices.get(zhaws_device.ieee)
assert client_device is not None
entity: BinarySensorEntity = find_entity(client_device, Platform.BINARY_SENSOR) # type: ignore
assert entity is not None
assert isinstance(entity, BinarySensorEntity)
assert entity.state.state is False

# test getting messages that trigger and reset the sensors
cluster = getattr(zigpy_device.endpoints[1], cluster_name)
await on_off_test(server, cluster, entity)

# test refresh
if cluster_name == "ias_zone":
cluster.PLUGGED_ATTR_READS = {"zone_status": 0}
update_attribute_cache(cluster)
await controller.entities.refresh_state(entity)
await server.async_block_till_done()
assert entity.state.state is False
76 changes: 76 additions & 0 deletions tests/websocket/test_button.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Test ZHA button."""

from typing import Optional
from unittest.mock import patch

from zigpy.const import SIG_EP_PROFILE
from zigpy.profiles import zha
from zigpy.zcl.clusters import general, security
import zigpy.zcl.foundation as zcl_f

from zha.application.discovery import Platform
from zha.application.platforms.model import BasePlatformEntity, ButtonEntity
from zha.websocket.client.controller import Controller
from zha.websocket.client.proxy import DeviceProxy
from zha.websocket.server.gateway import WebSocketGateway as Server

from ..common import (
SIG_EP_INPUT,
SIG_EP_OUTPUT,
SIG_EP_TYPE,
create_mock_zigpy_device,
join_zigpy_device,
mock_coro,
)


def find_entity(
device_proxy: DeviceProxy, platform: Platform
) -> Optional[BasePlatformEntity]:
"""Find an entity for the specified platform on the given device."""
for entity in device_proxy.device_model.entities.values():
if entity.platform == platform:
return entity
return None


async def test_button(
connected_client_and_server: tuple[Controller, Server],
) -> None:
"""Test zha button platform."""
controller, server = connected_client_and_server
zigpy_device = create_mock_zigpy_device(
server,
{
1: {
SIG_EP_INPUT: [
general.Basic.cluster_id,
general.Identify.cluster_id,
security.IasZone.cluster_id,
],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zha.DeviceType.IAS_ZONE,
SIG_EP_PROFILE: zha.PROFILE_ID,
}
},
)
zhaws_device = await join_zigpy_device(server, zigpy_device)
cluster = zigpy_device.endpoints[1].identify

assert cluster is not None
client_device: Optional[DeviceProxy] = controller.devices.get(zhaws_device.ieee)
assert client_device is not None
entity: ButtonEntity = find_entity(client_device, Platform.BUTTON) # type: ignore
assert entity is not None
assert isinstance(entity, ButtonEntity)

with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x00, zcl_f.Status.SUCCESS]),
):
await controller.buttons.press(entity)
await server.async_block_till_done()
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0
assert cluster.request.call_args[0][3] == 5 # duration in seconds
Loading

0 comments on commit c08566b

Please sign in to comment.