Skip to content

Add support for Tuya DPs mapped to multiple cluster attributes #3643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions tests/test_tuya_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for TuyaQuirkBuilder."""

from collections.abc import ByteString
import datetime
from unittest import mock

Expand All @@ -9,13 +10,19 @@
import zigpy.types as t
from zigpy.zcl import foundation
from zigpy.zcl.clusters.general import Basic
from zigpy.zcl.clusters.homeautomation import ElectricalMeasurement

from tests.common import ClusterListener, MockDatetime, wait_for_zigpy_tasks
import zhaquirks
from zhaquirks.const import BatterySize
from zhaquirks.tuya import (
TUYA_QUERY_DATA,
TUYA_SET_TIME,
DPToAttributeMapping,
TuyaCommand,
TuyaData,
TuyaDatapointData,
TuyaLocalCluster,
TuyaPowerConfigurationCluster,
TuyaPowerConfigurationCluster2AAA,
)
Expand Down Expand Up @@ -156,6 +163,18 @@ class TestEnum(t.enum8):
class ModTuyaMCUCluster(TuyaMCUCluster):
"""Modified Cluster."""

class Tuya3PhaseElectricalMeasurement(ElectricalMeasurement, TuyaLocalCluster):
"""Tuya Electrical Measurement cluster."""

def dpToPower(data: ByteString) -> int:
return data[0]

def dpToCurrent(data: ByteString) -> int:
return data[1]

def dpToVoltage(data: ByteString) -> int:
return data[2]

entry = (
TuyaQuirkBuilder(device_mock.manufacturer, device_mock.model, registry=registry)
.tuya_battery(dp_id=1)
Expand Down Expand Up @@ -193,6 +212,27 @@ class ModTuyaMCUCluster(TuyaMCUCluster):
translation_key="test_enum",
fallback_name="Test enum",
)
.tuya_dp_multi(
dp_id=11,
attribute_mapping=[
DPToAttributeMapping(
ep_attribute=Tuya3PhaseElectricalMeasurement.ep_attribute,
attribute_name="active_power",
converter=dpToPower,
),
DPToAttributeMapping(
ep_attribute=Tuya3PhaseElectricalMeasurement.ep_attribute,
attribute_name="rms_current",
converter=dpToCurrent,
),
DPToAttributeMapping(
ep_attribute=Tuya3PhaseElectricalMeasurement.ep_attribute,
attribute_name="rms_voltage",
converter=dpToVoltage,
),
],
)
.adds(Tuya3PhaseElectricalMeasurement)
.skip_configuration()
.add_to_registry(replacement_cluster=ModTuyaMCUCluster)
)
Expand Down Expand Up @@ -251,6 +291,17 @@ class ModTuyaMCUCluster(TuyaMCUCluster):
assert tuya_listener.attribute_updates[0][0] == 0xEF0A
assert tuya_listener.attribute_updates[0][1] == TestEnum.B

electric_data = TuyaCommand(
status=0,
tsn=2,
datapoints=[TuyaDatapointData(11, TuyaData("345"))],
)
tuya_cluster.handle_get_data(electric_data)
electrical_meas_cluster = ep.electrical_measurement
assert electrical_meas_cluster.get("active_power") == "3"
assert electrical_meas_cluster.get("rms_current") == "4"
assert electrical_meas_cluster.get("rms_voltage") == "5"


async def test_tuya_quirkbuilder_duplicated_mappings(device_mock):
"""Test that mapping the same DP multiple times will raise."""
Expand All @@ -268,6 +319,24 @@ async def test_tuya_quirkbuilder_duplicated_mappings(device_mock):
.add_to_registry()
)

with pytest.raises(ValueError):
(
TuyaQuirkBuilder(
device_mock.manufacturer, device_mock.model, registry=registry
)
.tuya_battery(dp_id=1)
.tuya_dp_multi(
dp_id=1,
attribute_mapping=[
DPToAttributeMapping(
ep_attribute=ElectricalMeasurement.ep_attribute,
attribute_name="active_power",
),
],
)
.add_to_registry()
)


@pytest.mark.parametrize(
"read_attr_spell,data_query_spell",
Expand Down
92 changes: 51 additions & 41 deletions zhaquirks/tuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,35 +1533,43 @@
),
}

dp_to_attribute: dict[int, DPToAttributeMapping] = {}
dp_to_attribute: dict[int, DPToAttributeMapping | list[DPToAttributeMapping]] = {}
data_point_handlers: dict[int, str] = {}

def __init__(self, *args, **kwargs):
"""Initialize the cluster and mark attributes as valid on LocalDataClusters."""
super().__init__(*args, **kwargs)
for dp_map in self.dp_to_attribute.values():

self._dp_to_attributes: dict[int, list[DPToAttributeMapping]] = {
dp: attr if isinstance(attr, list) else [attr]
for dp, attr in self.dp_to_attribute.items()
}
for dp_map in self._dp_to_attributes.values():
# get the endpoint that is being mapped to
endpoint = self.endpoint
if dp_map.endpoint_id:
endpoint = self.endpoint.device.endpoints.get(dp_map.endpoint_id)

# the endpoint to be mapped to might not actually exist within all quirks
if not endpoint:
continue

cluster = getattr(endpoint, dp_map.ep_attribute, None)
# the cluster to be mapped to might not actually exist within all quirks
if not cluster:
continue

# mark mapped to attribute as valid if existing and if on a LocalDataCluster
attr = cluster.attributes_by_name.get(dp_map.attribute_name)
if attr and isinstance(cluster, LocalDataCluster):
# _VALID_ATTRIBUTES is only a class variable, but as want to modify it
# per instance here, we need to create an instance variable first
if "_VALID_ATTRIBUTES" not in cluster.__dict__:
cluster._VALID_ATTRIBUTES = set()
cluster._VALID_ATTRIBUTES.add(attr.id)
for mapped_attr in dp_map:
if mapped_attr.endpoint_id:
endpoint = self.endpoint.device.endpoints.get(
mapped_attr.endpoint_id
)

# the endpoint to be mapped to might not actually exist within all quirks
if not endpoint:
continue

cluster = getattr(endpoint, mapped_attr.ep_attribute, None)
# the cluster to be mapped to might not actually exist within all quirks
if not cluster:
continue

# mark mapped to attribute as valid if existing and if on a LocalDataCluster
attr = cluster.attributes_by_name.get(mapped_attr.attribute_name)
if attr and isinstance(cluster, LocalDataCluster):
# _VALID_ATTRIBUTES is only a class variable, but as want to modify it
# per instance here, we need to create an instance variable first
if "_VALID_ATTRIBUTES" not in cluster.__dict__:
cluster._VALID_ATTRIBUTES = set()
cluster._VALID_ATTRIBUTES.add(attr.id)

def handle_cluster_request(
self,
Expand Down Expand Up @@ -1640,27 +1648,29 @@
def _dp_2_attr_update(self, datapoint: TuyaDatapointData) -> None:
"""Handle data point to attribute report conversion."""
try:
dp_map = self.dp_to_attribute[datapoint.dp]
dp_map = self._dp_to_attributes[datapoint.dp]
except KeyError:
self.debug("No attribute mapping for %s data point", datapoint.dp)
return

endpoint = self.endpoint
if dp_map.endpoint_id:
endpoint = self.endpoint.device.endpoints[dp_map.endpoint_id]
cluster = getattr(endpoint, dp_map.ep_attribute)
value = datapoint.data.payload
if dp_map.converter:
value = dp_map.converter(value)

if isinstance(dp_map.attribute_name, tuple):
for k, v in zip(dp_map.attribute_name, value):
if isinstance(v, AttributeWithMask):
v = cluster.get(k, 0) & (~v.mask) | v.value
cluster.update_attribute(k, v)
else:
if isinstance(value, AttributeWithMask):
value = (
cluster.get(dp_map.attribute_name, 0) & (~value.mask) | value.value
)
cluster.update_attribute(dp_map.attribute_name, value)
for mapped_attr in dp_map:
if mapped_attr.endpoint_id:
endpoint = self.endpoint.device.endpoints[mapped_attr.endpoint_id]
cluster = getattr(endpoint, mapped_attr.ep_attribute)
value = datapoint.data.payload
if mapped_attr.converter:
value = mapped_attr.converter(value)

if isinstance(mapped_attr.attribute_name, tuple):
for k, v in zip(mapped_attr.attribute_name, value):
if isinstance(v, AttributeWithMask):
v = cluster.get(k, 0) & (~v.mask) | v.value
cluster.update_attribute(k, v)
else:
if isinstance(value, AttributeWithMask):
value = (

Check warning on line 1672 in zhaquirks/tuya/__init__.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/tuya/__init__.py#L1672

Added line #L1672 was not covered by tests
cluster.get(mapped_attr.attribute_name, 0) & (~value.mask)
| value.value
)
cluster.update_attribute(mapped_attr.attribute_name, value)
31 changes: 22 additions & 9 deletions zhaquirks/tuya/builder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def __init__(
) -> None:
"""Init the TuyaQuirkBuilder."""
self.tuya_data_point_handlers: dict[int, str] = {}
self.tuya_dp_to_attribute: dict[int, DPToAttributeMapping] = {}
self.tuya_dp_to_attribute: dict[int, list[DPToAttributeMapping]] = {}
self.new_attributes: set[foundation.ZCLAttributeDef] = set()
super().__init__(manufacturer, model, registry)
# quirk_file will point to the init call above if called from this QuirkBuilder,
Expand Down Expand Up @@ -503,20 +503,33 @@ def tuya_dp(
) -> QuirkBuilder: # fmt: skip
"""Add Tuya DP Converter."""

if dp_id in self.tuya_dp_to_attribute:
raise ValueError(f"DP {dp_id} is already mapped.")

self.tuya_dp_to_attribute.update(
{
dp_id: DPToAttributeMapping(
self.tuya_dp_multi(
dp_id,
[
DPToAttributeMapping(
ep_attribute,
attribute_name,
converter=converter,
dp_converter=dp_converter,
endpoint_id=endpoint_id,
)
}
],
dp_handler,
)
return self

def tuya_dp_multi(
self,
dp_id: int,
attribute_mapping: list[DPToAttributeMapping],
dp_handler: str = "_dp_2_attr_update",
) -> QuirkBuilder: # fmt: skip
"""Add Tuya DP Converter that maps to multiple attributes."""

if dp_id in self.tuya_dp_to_attribute:
raise ValueError(f"DP {dp_id} is already mapped.")

self.tuya_dp_to_attribute.update({dp_id: attribute_mapping})
self.tuya_data_point_handlers.update({dp_id: dp_handler})
return self

Expand Down Expand Up @@ -814,7 +827,7 @@ class TuyaReplacementCluster(replacement_cluster): # type: ignore[valid-type]
"""Replacement Tuya Cluster."""

data_point_handlers: dict[int, str]
dp_to_attribute: dict[int, DPToAttributeMapping]
dp_to_attribute: dict[int, list[DPToAttributeMapping]]

class AttributeDefs(NewAttributeDefs):
"""Attribute Definitions."""
Expand Down
35 changes: 18 additions & 17 deletions zhaquirks/tuya/mcu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,26 @@
def get_dp_mapping(
self, endpoint_id: int, attribute_name: str
) -> Optional[tuple[int, DPToAttributeMapping]]:
"""Search for the DP in dp_to_attribute."""
"""Search for the DP in _dp_to_attributes."""

result = {}
for dp, dp_mapping in self.dp_to_attribute.items():
if (
attribute_name == dp_mapping.attribute_name
or (
isinstance(dp_mapping.attribute_name, tuple)
and attribute_name in dp_mapping.attribute_name
)
) and (
(
dp_mapping.endpoint_id is None
and endpoint_id == self.endpoint.endpoint_id
)
or (endpoint_id == dp_mapping.endpoint_id)
):
self.debug("get_dp_mapping --> found DP: %s", dp)
result[dp] = dp_mapping
for dp, dp_mapping in self._dp_to_attributes.items():
for mapped_attr in dp_mapping:
if (
attribute_name == mapped_attr.attribute_name
or (
isinstance(mapped_attr.attribute_name, tuple)
and attribute_name in mapped_attr.attribute_name
)
) and (
(
mapped_attr.endpoint_id is None
and endpoint_id == self.endpoint.endpoint_id
)
or (endpoint_id == mapped_attr.endpoint_id)
):
self.debug("get_dp_mapping --> found DP: %s", dp)
result[dp] = mapped_attr
return result

def handle_mcu_version_response(self, payload: MCUVersion) -> foundation.Status:
Expand All @@ -322,7 +323,7 @@
self.debug("handle_set_time_request payload: %s", payload)
payload_rsp = TuyaTimePayload()

utc_now = datetime.datetime.utcnow() # noqa: DTZ003

Check warning on line 326 in zhaquirks/tuya/mcu/__init__.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).

Check warning on line 326 in zhaquirks/tuya/mcu/__init__.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).

Check warning on line 326 in zhaquirks/tuya/mcu/__init__.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
now = datetime.datetime.now()

offset_time = datetime.datetime(self.set_time_offset, 1, 1)
Expand Down
Loading