From a535b4d8793e8d4046828f76bec338e10176f89c Mon Sep 17 00:00:00 2001 From: Denis Shulyaka Date: Sat, 30 Sep 2023 03:08:41 +0300 Subject: [PATCH] LQI and RSSI sensors for some devices --- zigpy_xbee/zigbee/application.py | 39 ++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 4b680df..ddebdbf 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -44,6 +44,7 @@ class ControllerApplication(zigpy.application.ControllerApplication): def __init__(self, config: dict[str, Any]): super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config)) self._api: zigpy_xbee.api.XBee | None = None + self.topology.add_listener(self) async def disconnect(self): """Shutdown application.""" @@ -341,6 +342,44 @@ def handle_rx( self.handle_message(device, profile_id, cluster_id, src_ep, dst_ep, data) + def neighbors_updated( + self, ieee: zigpy.types.EUI64, neighbors: list[zdo_t.Neighbor] + ) -> None: + """Neighbor update from Mgmt_Lqi_req.""" + for neighbor in neighbors: + if neighbor.relationship == zdo_t._NeighborEnums.Relationship.Parent: + device = self.get_device(ieee=ieee) + device.radio_details(lqi=neighbor.lqi, rssi=device.rssi) + + elif neighbor.relationship == zdo_t._NeighborEnums.Relationship.Child: + try: + child_device = self.get_device(ieee=neighbor.ieee) + child_device.radio_details(lqi=neighbor.lqi, rssi=child_device.rssi) + except KeyError: + LOGGER.warning("Unknown device %r", neighbor.ieee) + + def routes_updated( + self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route] + ) -> None: + """Route update from Mgmt_Rtg_req.""" + self.create_task( + self._routes_updated(ieee, routes), f"routes_updated-ieee={ieee}" + ) + + async def _routes_updated( + self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route] + ) -> None: + for route in routes: + if ( + route.DstNWK == self.state.node_info.nwk + and route.NextHop == self.state.node_info.nwk + and route.RouteStatus == zdo_t.RouteStatus.Active + ): + device = self.get_device(ieee=ieee) + rssi = await self._api._at_command("DB") + device.radio_details(lqi=device.lqi, rssi=-rssi) + break + class XBeeCoordinator(zigpy.quirks.CustomDevice): class XBeeGroup(zigpy.quirks.CustomCluster, Groups):