Skip to content

Commit

Permalink
Merge branch 'dev' into tuya_ts0205
Browse files Browse the repository at this point in the history
  • Loading branch information
cdalexndr authored Jul 17, 2024
2 parents fa98480 + 8317dc6 commit 986ed37
Show file tree
Hide file tree
Showing 25 changed files with 731 additions and 488 deletions.
48 changes: 38 additions & 10 deletions tests/test_ikea.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tests.common import ClusterListener
import zhaquirks
import zhaquirks.ikea.starkvind
from zhaquirks.ikea.starkvind import IkeaAirpurifier

zhaquirks.setup()

Expand Down Expand Up @@ -85,6 +86,33 @@ def test_ikea_starkvind_v2(assert_signature_matches_quirk):
assert_signature_matches_quirk(zhaquirks.ikea.starkvind.IkeaSTARKVIND_v2, signature)


@pytest.mark.parametrize("attribute", ["fan_speed", "fan_mode"])
@pytest.mark.parametrize("value,expected", [
(0, 0), # off
(1, 1), # auto
(10, 2),
(20, 4),
(50, 10),
]
)
async def test_fan_speed_mode_update(zigpy_device_from_quirk, attribute, value, expected):
"""Test reading the fan speed and mode."""

starkvind_device = zigpy_device_from_quirk(zhaquirks.ikea.starkvind.IkeaSTARKVIND)
assert starkvind_device.model == "STARKVIND Air purifier"

ikea_cluster = starkvind_device.endpoints[1].in_clusters[
zhaquirks.ikea.starkvind.IkeaAirpurifier.cluster_id
]
ikea_listener = ClusterListener(ikea_cluster)

attr_id = getattr(IkeaAirpurifier.AttributeDefs, attribute).id

ikea_cluster.update_attribute(attr_id, value)
assert len(ikea_listener.attribute_updates) == 1
assert ikea_listener.attribute_updates[0] == (attr_id, expected)


async def test_pm25_cluster_read(zigpy_device_from_quirk):
"""Test reading from PM25 cluster."""

Expand Down Expand Up @@ -131,14 +159,14 @@ def mock_read(attributes, manufacturer=None):
@pytest.mark.parametrize(
"firmware, pct_device, pct_correct, expected_pct_updates, expect_log_warning",
(
("1.0.024", 50, 100, 1, False), # old firmware, doubling
("2.3.075", 50, 100, 1, False), # old firmware, doubling
("2.4.5", 50, 50, 2, False), # new firmware, no doubling
("3.0.0", 50, 50, 2, False), # new firmware, no doubling
("24.4.5", 50, 50, 2, False), # new firmware, no doubling
("invalid_fw_string_1", 50, 50, 2, False), # treated as new, no doubling
("invalid.fw.string.2", 50, 50, 2, True), # treated as new, no doubling + log
("", 50, 100, 1, False), # treated as old fw, doubling
("1.0.024", 50, 100, 2, False), # old firmware, doubling
("2.3.075", 50, 100, 2, False), # old firmware, doubling
("2.4.5", 50, 50, 1, False), # new firmware, no doubling
("3.0.0", 50, 50, 1, False), # new firmware, no doubling
("24.4.5", 50, 50, 1, False), # new firmware, no doubling
("invalid_fw_string_1", 50, 50, 1, False), # treated as new, no doubling
("invalid.fw.string.2", 50, 50, 1, True), # treated as new, no doubling + log
("", 50, 50, 1, False), # treated as new fw, no doubling
),
)
async def test_double_power_config_firmware(
Expand Down Expand Up @@ -178,10 +206,10 @@ def mock_read(attributes, manufacturer=None):
)

with p1 as mock_task, p2 as request_mock:
# update battery percentage with no firmware in attr cache, check pct doubled for now
# update battery percentage with no firmware in attr cache, check pct not doubled for now
power_cluster.update_attribute(battery_pct_id, pct_device)
assert len(power_listener.attribute_updates) == 1
assert power_listener.attribute_updates[0] == (battery_pct_id, pct_device * 2)
assert power_listener.attribute_updates[0] == (battery_pct_id, pct_device)

# but also check that sw_build_id read is requested in the background for next update
assert mock_task.call_count == 1
Expand Down
7 changes: 1 addition & 6 deletions tests/test_quirks.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,6 @@ def test_migrated_lighting_automation_triggers(quirk: CustomDevice) -> None:
(zhaquirks.aurora.aurora_dimmer.COLOR_DOWN, const.LEFT),
],
],
zhaquirks.ikea.fourbtnremote.IkeaTradfriRemoteV1: [
[
(const.LONG_RELEASE, const.DIM_UP),
(const.LONG_RELEASE, const.DIM_DOWN),
]
],
zhaquirks.paulmann.fourbtnremote.PaulmannRemote4Btn: [
[
(const.LONG_RELEASE, const.BUTTON_1),
Expand All @@ -666,6 +660,7 @@ def test_migrated_lighting_automation_triggers(quirk: CustomDevice) -> None:
}


# XXX: Test does not handle v2 quirks
@pytest.mark.parametrize(
"quirk",
[q for q in ALL_QUIRK_CLASSES if getattr(q, "device_automation_triggers", None)],
Expand Down
7 changes: 4 additions & 3 deletions tests/test_xiaomi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,12 +1135,13 @@ async def test_xiaomi_e1_thermostat_schedule_settings_deserialization(
(
(zhaquirks.xiaomi.aqara.motion_ac02.LumiMotionAC02, 0),
(zhaquirks.xiaomi.aqara.motion_agl02.MotionT1, -1),
(zhaquirks.xiaomi.aqara.motion_acn001.MotionE1, -1),
),
)
async def test_xiaomi_p1_t1_motion_sensor(
zigpy_device_from_quirk, quirk, invalid_iilluminance_report
):
"""Test Aqara P1 and T1 motion sensors."""
"""Test Aqara P1, T1, and E1 motion sensors."""

device = zigpy_device_from_quirk(quirk)

Expand Down Expand Up @@ -1192,12 +1193,12 @@ async def test_xiaomi_p1_t1_motion_sensor(
opple_cluster.update_attribute(274, 0xFFFF)

# confirm invalid illuminance report is interpreted as 0 for P1 sensor,
# and -1 for the T1 sensor, as it doesn't seem like the T1 sensor sends invalid illuminance reports
# and -1 for the T1/E1 sensors, as they don't seem to send invalid illuminance reports
assert len(illuminance_listener.attribute_updates) == 2
assert illuminance_listener.attribute_updates[1][0] == zcl_iilluminance_id
assert illuminance_listener.attribute_updates[1][1] == invalid_iilluminance_report

# send illuminance report only
# send illuminance report only, parsed via Xiaomi cluster implementation
opple_cluster.update_attribute(
XIAOMI_AQARA_ATTRIBUTE_E1, create_aqara_attr_report({101: 20})
)
Expand Down
22 changes: 11 additions & 11 deletions zhaquirks/ikea/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ def _is_firmware_new(self):
# get sw_build_id from attribute cache if available
sw_build_id = self.endpoint.basic.get(Basic.AttributeDefs.sw_build_id.id)

# sw_build_id is not cached or empty, so we consider it old firmware for now
# sw_build_id is not cached or empty, so we consider it new firmware for now
if not sw_build_id:
return False
return True

# split sw_build_id into parts to check for new firmware
split_fw_version = sw_build_id.split(".")
Expand Down Expand Up @@ -247,19 +247,19 @@ async def _read_fw_and_update_battery_pct(self, reported_battery_pct):
# read sw_build_id from device
await self.endpoint.basic.read_attributes([Basic.AttributeDefs.sw_build_id.id])

# check if sw_build_id was read successfully and new firmware is installed
# if so, update cache with reported battery percentage (non-doubled)
if self._is_firmware_new():
# check if sw_build_id was read successfully and old firmware is installed
# if so, update cache with reported battery percentage (doubled)
if not self._is_firmware_new():
self._update_attribute(
PowerConfiguration.AttributeDefs.battery_percentage_remaining.id,
reported_battery_pct,
reported_battery_pct * 2,
)

def _update_attribute(self, attrid, value):
"""Update attribute to double battery percentage if firmware is old/unknown.
"""Update attribute to double battery percentage if firmware is old.
If the firmware version is unknown, a background task to read the firmware version is also started,
but the percentage is also doubled for now then, as that task happens asynchronously.
but the percentage is not doubled for now then, as that task happens asynchronously.
"""
if attrid == PowerConfiguration.AttributeDefs.battery_percentage_remaining.id:
# if sw_build_id is not cached, create task to read from device, since it should be awake now
Expand All @@ -269,9 +269,9 @@ def _update_attribute(self, attrid, value):
):
self.create_catching_task(self._read_fw_and_update_battery_pct(value))

# double percentage if the firmware is old or unknown
# the coroutine above will not have executed yet if the firmware is unknown,
# so we double for now in that case too, and it updates again later if our doubling was wrong
# double percentage if the firmware is confirmed old
# The coroutine above will not have executed yet if the firmware is unknown,
# so we don't double for now. The coro doubles the value later if needed.
if not self._is_firmware_new():
value = value * 2
super()._update_attribute(attrid, value)
Expand Down
Loading

0 comments on commit 986ed37

Please sign in to comment.