Skip to content

Commit

Permalink
Adjust to load new format
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Oct 31, 2024
1 parent f51d1ab commit 664951c
Showing 1 changed file with 90 additions and 104 deletions.
194 changes: 90 additions & 104 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,127 +314,113 @@ def zigpy_device_from_device_data(
quirk: Optional[Callable] = None,
) -> zigpy.device.Device:
"""Make a fake device using the specified cluster classes."""
ieee = zigpy.types.EUI64.convert(device_data["ieee"])
nwk = device_data["nwk"]
manufacturer = device_data["manufacturer"]
model = device_data["model"]
node_descriptor = device_data["signature"]["node_descriptor"]
endpoints = device_data["signature"]["endpoints"]
cluster_data = device_data["cluster_details"]

device = zigpy.device.Device(app, ieee, nwk)
device.manufacturer = manufacturer
device.model = model

device = zigpy.device.Device(
application=app,
ieee=zigpy.types.EUI64.convert(device_data["ieee"]),
nwk=device_data["nwk"],
)
device.manufacturer = device_data["manufacturer"]
device.model = device_data["model"]
device.last_seen = datetime.fromisoformat(device_data["last_seen"])
device.lqi = int(device_data["lqi"]) if device_data["lqi"] is not None else None
device.rssi = int(device_data["rssi"]) if device_data["rssi"] is not None else None

node_desc = device_data["node_descriptor"]
device.node_desc = zdo_t.NodeDescriptor(
logical_type=node_descriptor["logical_type"],
complex_descriptor_available=node_descriptor["complex_descriptor_available"],
user_descriptor_available=node_descriptor["user_descriptor_available"],
reserved=node_descriptor["reserved"],
aps_flags=node_descriptor["aps_flags"],
frequency_band=node_descriptor["frequency_band"],
mac_capability_flags=node_descriptor["mac_capability_flags"],
manufacturer_code=node_descriptor["manufacturer_code"],
maximum_buffer_size=node_descriptor["maximum_buffer_size"],
maximum_incoming_transfer_size=node_descriptor[
"maximum_incoming_transfer_size"
],
server_mask=node_descriptor["server_mask"],
maximum_outgoing_transfer_size=node_descriptor[
"maximum_outgoing_transfer_size"
],
descriptor_capability_field=node_descriptor["descriptor_capability_field"],
logical_type=node_desc["logical_type"],
complex_descriptor_available=node_desc["complex_descriptor_available"],
user_descriptor_available=node_desc["user_descriptor_available"],
reserved=node_desc["reserved"],
aps_flags=node_desc["aps_flags"],
frequency_band=node_desc["frequency_band"],
mac_capability_flags=node_desc["mac_capability_flags"],
manufacturer_code=node_desc["manufacturer_code"],
maximum_buffer_size=node_desc["maximum_buffer_size"],
maximum_incoming_transfer_size=node_desc["maximum_incoming_transfer_size"],
server_mask=node_desc["server_mask"],
maximum_outgoing_transfer_size=node_desc["maximum_outgoing_transfer_size"],
descriptor_capability_field=node_desc["descriptor_capability_field"],
)

orig_endpoints = (
device_data["original_signature"]["endpoints"]
if "original_signature" in device_data
else endpoints
)
for epid, ep in orig_endpoints.items():
endpoint = device.add_endpoint(int(epid))
profile = None
with suppress(Exception):
profile = zigpy.profiles.PROFILES[int(ep["profile_id"], 16)]

endpoint.device_type = (
profile.DeviceType(int(ep["device_type"], 16))
if profile
else int(ep["device_type"], 16)
)
endpoint.profile_id = (
profile.PROFILE_ID if profile else int(ep["profile_id"], 16)
)
endpoint.request = AsyncMock(return_value=[0])
if device_data.get("original_signature", {}):
for epid, ep in device_data["original_signature"]["endpoints"].items():
endpoint = device.add_endpoint(int(epid))
profile = None
with suppress(Exception):
profile = zigpy.profiles.PROFILES[int(ep["profile_id"], 16)]

endpoint.profile_id = (
profile.PROFILE_ID if profile is not None else int(ep["profile_id"], 16)
)

for cluster_id in ep["input_clusters"]:
endpoint.add_input_cluster(int(cluster_id, 16))
endpoint.device_type = (
profile.DeviceType(int(ep["device_type"], 16))
if profile is not None
else int(ep["device_type"], 16)
)

for cluster_id in ep["output_clusters"]:
endpoint.add_output_cluster(int(cluster_id, 16))
for cluster_id in ep["input_clusters"]:
endpoint.add_input_cluster(int(cluster_id, 16))

for cluster_id in ep["output_clusters"]:
endpoint.add_output_cluster(int(cluster_id, 16))
else:
for epid, ep in device_data["endpoints"].items():
endpoint = device.add_endpoint(int(epid))
profile = None
with suppress(Exception):
profile = zigpy.profiles.PROFILES[ep["profile_id"]]

endpoint.profile_id = (
profile.PROFILE_ID if profile is not None else ep["profile_id"]
)

endpoint.device_type = (
profile.DeviceType(ep["device_type"]["id"])
if profile is not None
else ep["device_type"]["id"]
)

for cluster in ep["in_clusters"]:
endpoint.add_input_cluster(int(cluster["cluster_id"], 16))

for cluster in ep["out_clusters"]:
endpoint.add_output_cluster(int(cluster["cluster_id"], 16))

if quirk:
device = quirk(app, device.ieee, device.nwk, device)
else:
device = quirks_get_device(device)

for epid, ep in cluster_data.items():
for epid, ep in device_data["endpoints"].items():
endpoint = device.endpoints[int(epid)]
endpoint.request = AsyncMock(return_value=[0])
for cluster_id, cluster in ep["in_clusters"].items():
real_cluster = device.endpoints[int(epid)].in_clusters[int(cluster_id, 16)]
if patch_cluster:
patch_cluster_for_testing(real_cluster)
for attr_id, attr in cluster["attributes"].items():
if (
attr["value"] is None
or attr_id in cluster["unsupported_attributes"]
):
continue
real_cluster._attr_cache[int(attr_id, 16)] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"]
for unsupported_attr in cluster["unsupported_attributes"]:
if isinstance(unsupported_attr, str) and unsupported_attr.startswith(
"0x"
):
attrid = int(unsupported_attr, 16)
real_cluster.unsupported_attributes.add(attrid)
if attrid in real_cluster.attributes:
real_cluster.unsupported_attributes.add(
real_cluster.attributes[attrid].name
)
else:
real_cluster.unsupported_attributes.add(unsupported_attr)

for cluster_id, cluster in ep["out_clusters"].items():
real_cluster = device.endpoints[int(epid)].out_clusters[int(cluster_id, 16)]
if patch_cluster:
patch_cluster_for_testing(real_cluster)
for attr_id, attr in cluster["attributes"].items():
if (
attr["value"] is None
or attr_id in cluster["unsupported_attributes"]
):
continue
real_cluster._attr_cache[int(attr_id, 16)] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"]
for unsupported_attr in cluster["unsupported_attributes"]:
if isinstance(unsupported_attr, str) and unsupported_attr.startswith(
"0x"
):
attrid = int(unsupported_attr, 16)
real_cluster.unsupported_attributes.add(attrid)
if attrid in real_cluster.attributes:
real_cluster.unsupported_attributes.add(
real_cluster.attributes[attrid].name
)
else:
real_cluster.unsupported_attributes.add(unsupported_attr)

for cluster_type in ("in_clusters", "out_clusters"):
for cluster in ep[cluster_type]:
real_cluster = getattr(endpoint, cluster_type)[
int(cluster["cluster_id"], 16)
]

if patch_cluster:
patch_cluster_for_testing(real_cluster)

for attr in cluster["attributes"]:
attrid = int(attr["id"], 16)

if attr["value"] is not None:
real_cluster._attr_cache[attrid] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[attrid] = attr["value"]

if attr["unsupported"]:
real_cluster.unsupported_attributes.add(attrid)

if attr["name"] is not None:
real_cluster.unsupported_attributes.add(attr["name"])

for obj in device_data["neighbors"]:
app.topology.neighbors[ieee].append(
app.topology.neighbors[device.ieee].append(
zdo_t.Neighbor(
device_type=zdo_t.Neighbor.DeviceType[obj["device_type"]],
rx_on_when_idle=zdo_t.Neighbor.RxOnWhenIdle[obj["rx_on_when_idle"]],
Expand All @@ -450,7 +436,7 @@ def zigpy_device_from_device_data(
)

for obj in device_data["routes"]:
app.topology.routes[ieee].append(
app.topology.routes[device.ieee].append(
zdo_t.Route(
DstNWK=t.NWK.convert(obj["dest_nwk"][2:]),
RouteStatus=zdo_t.RouteStatus[obj["route_status"]],
Expand Down

0 comments on commit 664951c

Please sign in to comment.