From f1eeca806b5a0793c7496389f64e0fe9f27bec36 Mon Sep 17 00:00:00 2001 From: Denis Shulyaka Date: Sun, 1 Oct 2023 20:52:39 +0300 Subject: [PATCH] Channel energy scan (#149) * XBee: energy scan * convert values * add test * improve test * apply suggestions from code review --- tests/test_application.py | 29 ++++++++++++++++++++++++++ zigpy_xbee/api.py | 2 ++ zigpy_xbee/zigbee/application.py | 35 ++++++++++++++++++++++++++++++-- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/tests/test_application.py b/tests/test_application.py index 63de630..6cf4d76 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -597,3 +597,32 @@ async def test_move_network_to_channel(app): assert len(app._api._queued_at.mock_calls) == 1 app._api._queued_at.assert_any_call("SC", 1 << (26 - 11)) + + +async def test_energy_scan(app): + rssi = b"\x0A\x0F\x14\x19\x1E\x23\x28\x2D\x32\x37\x3C\x41\x46\x4B\x50\x55" + app._api._at_command = mock.AsyncMock(spec=XBee._at_command, return_value=rssi) + time_s = 3 + count = 3 + energy = await app.energy_scan( + channels=[x for x in range(11, 27)], duration_exp=time_s, count=count + ) + assert app._api._at_command.mock_calls == [mock.call("ED", time_s)] * count + assert {k: round(v, 3) for k, v in energy.items()} == { + 11: 254.032, + 12: 253.153, + 13: 251.486, + 14: 248.352, + 15: 242.562, + 16: 232.193, + 17: 214.619, + 18: 187.443, + 19: 150.853, + 20: 109.797, + 21: 72.172, + 22: 43.571, + 23: 24.769, + 24: 13.56, + 25: 7.264, + 26: 3.844, + } diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index ea4bb4e..f4f6ffc 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -216,6 +216,8 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum): "D7": t.uint8_t, # 0 - 7 (an Enum) "P3": t.uint8_t, # 0 - 5 (an Enum) "P4": t.uint8_t, # 0 - 5 (an Enum) + # MAC diagnostics commands + "ED": t.Bytes, # 16-byte value # I/O commands "IR": t.uint16_t, "IC": t.uint16_t, diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 9463955..8cad835 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -2,6 +2,8 @@ import asyncio import logging +import math +import statistics import time from typing import Any @@ -181,9 +183,38 @@ async def energy_scan( self, channels: zigpy.types.Channels, duration_exp: int, count: int ) -> dict[int, float]: """Runs an energy detection scan and returns the per-channel scan results.""" + all_results = {} - LOGGER.warning("Coordinator does not support energy scanning") - return {c: 0 for c in channels} + for _ in range(count): + results = await self._api._at_command("ED", duration_exp) + results = { + channel: -int(rssi) for channel, rssi in zip(range(11, 27), results) + } + + for channel, rssi in results.items(): + all_results.setdefault(channel, []).append(rssi) + + def logistic(x: float, *, L: float = 1, x_0: float = 0, k: float = 1) -> float: + """Logistic function.""" + return L / (1 + math.exp(-k * (x - x_0))) + + def map_rssi_to_energy(rssi: int) -> float: + """Remaps RSSI (in dBm) to Energy (0-255).""" + RSSI_MAX = -5 + RSSI_MIN = -92 + return logistic( + x=rssi, + L=255, + x_0=RSSI_MIN + 0.45 * (RSSI_MAX - RSSI_MIN), + k=0.13, + ) + + energy = { + channel: map_rssi_to_energy(statistics.mean(all_rssi)) + for channel, all_rssi in all_results.items() + } + + return {channel: energy.get(channel, 0) for channel in channels} async def force_remove(self, dev): """Forcibly remove device from NCP."""