Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LQI and RSSI sensors for some devices #153

Merged
merged 7 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 138 additions & 2 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,13 @@ def test_rx_unknown_device_ieee(app):

@pytest.fixture
def device(app):
def _device(new=False, zdo_init=False, nwk=t.uint16_t(0x1234)):
def _device(
new=False, zdo_init=False, nwk=0x1234, ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01"
):
from zigpy.device import Device, Status as DeviceStatus

ieee, _ = t.EUI64.deserialize(b"\x08\x07\x06\x05\x04\x03\x02\x01")
nwk = t.uint16_t(nwk)
ieee, _ = t.EUI64.deserialize(ieee)
dev = Device(app, ieee, nwk)
if new:
dev.status = DeviceStatus.NEW
Expand Down Expand Up @@ -680,3 +683,136 @@ async def test_energy_scan(app):
25: 7.264,
26: 3.844,
}


def test_neighbors_updated(app, device):
router = device(ieee=b"\x01\x02\x03\x04\x05\x06\x07\x08")
router.radio_details = mock.MagicMock()
end_device = device(ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01")
end_device.radio_details = mock.MagicMock()

app.devices[router.ieee] = router
app.devices[end_device.ieee] = end_device

pan_id = t.ExtendedPanId(b"\x07\x07\x07\x07\x07\x07\x07\x07")
# The router has two neighbors: the coordinator and the end device
neighbors = [
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=app.state.node_info.ieee,
nwk=app.state.node_info.nwk,
device_type=0x0,
rx_on_when_idle=0x1,
relationship=0x00,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=0,
lqi=128,
),
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=end_device.ieee,
nwk=end_device.nwk,
device_type=0x2,
rx_on_when_idle=0x0,
relationship=0x01,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=2,
lqi=100,
),
# Let's also include an unknown device
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=t.EUI64(b"\x00\x0F\x0E\x0D\x0C\x0B\x0A\x09"),
nwk=t.NWK(0x9999),
device_type=0x2,
rx_on_when_idle=0x0,
relationship=0x01,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=2,
lqi=99,
),
]

app.neighbors_updated(router.ieee, neighbors)

router.radio_details.assert_called_once_with(lqi=128, rssi=None)
end_device.radio_details.assert_called_once_with(lqi=100, rssi=None)


def test_routes_updated_schedule(app):
app.create_task = mock.MagicMock()
app._routes_updated = mock.MagicMock()

ieee = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
routes = []
app.routes_updated(ieee, routes)

assert app.create_task.call_count == 1
app._routes_updated.assert_called_once_with(ieee, routes)


async def test_routes_updated(app, device):
rssi = 0x50
app._api._at_command = mock.AsyncMock(return_value=rssi)

router1 = device(ieee=b"\x01\x02\x03\x04\x05\x06\x07\x08")
router1.radio_details = mock.MagicMock()
router2 = device(ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01")
router2.radio_details = mock.MagicMock()

app.devices[router1.ieee] = router1
app.devices[router2.ieee] = router2

# Let router1 be immediate child and route2 be child of the router1.
# Then the routes of router1 would be:
routes = [
zdo_t.Route(
DstNWK=app.state.node_info.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x1,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=app.state.node_info.nwk,
),
zdo_t.Route(
DstNWK=router2.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x0,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=router2.nwk,
),
]

await app._routes_updated(router1.ieee, routes)

router1.radio_details.assert_called_once_with(lqi=None, rssi=-80)
assert router2.radio_details.call_count == 0

router1.radio_details.reset_mock()

routes = [
zdo_t.Route(
DstNWK=router1.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x0,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=router1.nwk,
)
]
await app._routes_updated(router2.ieee, routes)

assert router1.radio_details.call_count == 0
assert router2.radio_details.call_count == 0

app._api._at_command.assert_awaited_once_with("DB")
39 changes: 39 additions & 0 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ControllerApplication(zigpy.application.ControllerApplication):
def __init__(self, config: dict[str, Any]):
super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config))
self._api: zigpy_xbee.api.XBee | None = None
self.topology.add_listener(self)

async def disconnect(self):
"""Shutdown application."""
Expand Down Expand Up @@ -385,6 +386,44 @@ def handle_rx(

self.handle_message(device, profile_id, cluster_id, src_ep, dst_ep, data)

def neighbors_updated(
self, ieee: zigpy.types.EUI64, neighbors: list[zdo_t.Neighbor]
) -> None:
"""Neighbor update from Mgmt_Lqi_req."""
for neighbor in neighbors:
if neighbor.relationship == zdo_t._NeighborEnums.Relationship.Parent:
Shulyaka marked this conversation as resolved.
Show resolved Hide resolved
device = self.get_device(ieee=ieee)
device.radio_details(lqi=neighbor.lqi, rssi=device.rssi)

elif neighbor.relationship == zdo_t._NeighborEnums.Relationship.Child:
Shulyaka marked this conversation as resolved.
Show resolved Hide resolved
try:
child_device = self.get_device(ieee=neighbor.ieee)
child_device.radio_details(lqi=neighbor.lqi, rssi=child_device.rssi)
except KeyError:
LOGGER.warning("Unknown device %r", neighbor.ieee)

def routes_updated(
self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route]
) -> None:
"""Route update from Mgmt_Rtg_req."""
self.create_task(
self._routes_updated(ieee, routes), f"routes_updated-ieee={ieee}"
)

async def _routes_updated(
self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route]
) -> None:
for route in routes:
if (
route.DstNWK == self.state.node_info.nwk
and route.NextHop == self.state.node_info.nwk
and route.RouteStatus == zdo_t.RouteStatus.Active
):
device = self.get_device(ieee=ieee)
rssi = await self._api._at_command("DB")
device.radio_details(lqi=device.lqi, rssi=-rssi)
break


class XBeeCoordinator(zigpy.quirks.CustomDevice):
class XBeeGroup(zigpy.quirks.CustomCluster, Groups):
Expand Down