diff --git a/tests/common.py b/tests/common.py index 03e2c877d..8ad972a61 100644 --- a/tests/common.py +++ b/tests/common.py @@ -314,127 +314,113 @@ def zigpy_device_from_device_data( quirk: Optional[Callable] = None, ) -> zigpy.device.Device: """Make a fake device using the specified cluster classes.""" - ieee = zigpy.types.EUI64.convert(device_data["ieee"]) - nwk = device_data["nwk"] - manufacturer = device_data["manufacturer"] - model = device_data["model"] - node_descriptor = device_data["signature"]["node_descriptor"] - endpoints = device_data["signature"]["endpoints"] - cluster_data = device_data["cluster_details"] - - device = zigpy.device.Device(app, ieee, nwk) - device.manufacturer = manufacturer - device.model = model + + device = zigpy.device.Device( + application=app, + ieee=zigpy.types.EUI64.convert(device_data["ieee"]), + nwk=device_data["nwk"], + ) + device.manufacturer = device_data["manufacturer"] + device.model = device_data["model"] device.last_seen = datetime.fromisoformat(device_data["last_seen"]) device.lqi = int(device_data["lqi"]) if device_data["lqi"] is not None else None device.rssi = int(device_data["rssi"]) if device_data["rssi"] is not None else None + node_desc = device_data["node_descriptor"] device.node_desc = zdo_t.NodeDescriptor( - logical_type=node_descriptor["logical_type"], - complex_descriptor_available=node_descriptor["complex_descriptor_available"], - user_descriptor_available=node_descriptor["user_descriptor_available"], - reserved=node_descriptor["reserved"], - aps_flags=node_descriptor["aps_flags"], - frequency_band=node_descriptor["frequency_band"], - mac_capability_flags=node_descriptor["mac_capability_flags"], - manufacturer_code=node_descriptor["manufacturer_code"], - maximum_buffer_size=node_descriptor["maximum_buffer_size"], - maximum_incoming_transfer_size=node_descriptor[ - "maximum_incoming_transfer_size" - ], - server_mask=node_descriptor["server_mask"], - maximum_outgoing_transfer_size=node_descriptor[ - "maximum_outgoing_transfer_size" - ], - descriptor_capability_field=node_descriptor["descriptor_capability_field"], + logical_type=node_desc["logical_type"], + complex_descriptor_available=node_desc["complex_descriptor_available"], + user_descriptor_available=node_desc["user_descriptor_available"], + reserved=node_desc["reserved"], + aps_flags=node_desc["aps_flags"], + frequency_band=node_desc["frequency_band"], + mac_capability_flags=node_desc["mac_capability_flags"], + manufacturer_code=node_desc["manufacturer_code"], + maximum_buffer_size=node_desc["maximum_buffer_size"], + maximum_incoming_transfer_size=node_desc["maximum_incoming_transfer_size"], + server_mask=node_desc["server_mask"], + maximum_outgoing_transfer_size=node_desc["maximum_outgoing_transfer_size"], + descriptor_capability_field=node_desc["descriptor_capability_field"], ) - orig_endpoints = ( - device_data["original_signature"]["endpoints"] - if "original_signature" in device_data - else endpoints - ) - for epid, ep in orig_endpoints.items(): - endpoint = device.add_endpoint(int(epid)) - profile = None - with suppress(Exception): - profile = zigpy.profiles.PROFILES[int(ep["profile_id"], 16)] - - endpoint.device_type = ( - profile.DeviceType(int(ep["device_type"], 16)) - if profile - else int(ep["device_type"], 16) - ) - endpoint.profile_id = ( - profile.PROFILE_ID if profile else int(ep["profile_id"], 16) - ) - endpoint.request = AsyncMock(return_value=[0]) + if device_data.get("original_signature", {}): + for epid, ep in device_data["original_signature"]["endpoints"].items(): + endpoint = device.add_endpoint(int(epid)) + profile = None + with suppress(Exception): + profile = zigpy.profiles.PROFILES[int(ep["profile_id"], 16)] + + endpoint.profile_id = ( + profile.PROFILE_ID if profile is not None else int(ep["profile_id"], 16) + ) - for cluster_id in ep["input_clusters"]: - endpoint.add_input_cluster(int(cluster_id, 16)) + endpoint.device_type = ( + profile.DeviceType(int(ep["device_type"], 16)) + if profile is not None + else int(ep["device_type"], 16) + ) - for cluster_id in ep["output_clusters"]: - endpoint.add_output_cluster(int(cluster_id, 16)) + for cluster_id in ep["input_clusters"]: + endpoint.add_input_cluster(int(cluster_id, 16)) + + for cluster_id in ep["output_clusters"]: + endpoint.add_output_cluster(int(cluster_id, 16)) + else: + for epid, ep in device_data["endpoints"].items(): + endpoint = device.add_endpoint(int(epid)) + profile = None + with suppress(Exception): + profile = zigpy.profiles.PROFILES[ep["profile_id"]] + + endpoint.profile_id = ( + profile.PROFILE_ID if profile is not None else ep["profile_id"] + ) + + endpoint.device_type = ( + profile.DeviceType(ep["device_type"]["id"]) + if profile is not None + else ep["device_type"]["id"] + ) + + for cluster in ep["in_clusters"]: + endpoint.add_input_cluster(int(cluster["cluster_id"], 16)) + + for cluster in ep["out_clusters"]: + endpoint.add_output_cluster(int(cluster["cluster_id"], 16)) if quirk: device = quirk(app, device.ieee, device.nwk, device) else: device = quirks_get_device(device) - for epid, ep in cluster_data.items(): + for epid, ep in device_data["endpoints"].items(): + endpoint = device.endpoints[int(epid)] endpoint.request = AsyncMock(return_value=[0]) - for cluster_id, cluster in ep["in_clusters"].items(): - real_cluster = device.endpoints[int(epid)].in_clusters[int(cluster_id, 16)] - if patch_cluster: - patch_cluster_for_testing(real_cluster) - for attr_id, attr in cluster["attributes"].items(): - if ( - attr["value"] is None - or attr_id in cluster["unsupported_attributes"] - ): - continue - real_cluster._attr_cache[int(attr_id, 16)] = attr["value"] - real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"] - for unsupported_attr in cluster["unsupported_attributes"]: - if isinstance(unsupported_attr, str) and unsupported_attr.startswith( - "0x" - ): - attrid = int(unsupported_attr, 16) - real_cluster.unsupported_attributes.add(attrid) - if attrid in real_cluster.attributes: - real_cluster.unsupported_attributes.add( - real_cluster.attributes[attrid].name - ) - else: - real_cluster.unsupported_attributes.add(unsupported_attr) - - for cluster_id, cluster in ep["out_clusters"].items(): - real_cluster = device.endpoints[int(epid)].out_clusters[int(cluster_id, 16)] - if patch_cluster: - patch_cluster_for_testing(real_cluster) - for attr_id, attr in cluster["attributes"].items(): - if ( - attr["value"] is None - or attr_id in cluster["unsupported_attributes"] - ): - continue - real_cluster._attr_cache[int(attr_id, 16)] = attr["value"] - real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"] - for unsupported_attr in cluster["unsupported_attributes"]: - if isinstance(unsupported_attr, str) and unsupported_attr.startswith( - "0x" - ): - attrid = int(unsupported_attr, 16) - real_cluster.unsupported_attributes.add(attrid) - if attrid in real_cluster.attributes: - real_cluster.unsupported_attributes.add( - real_cluster.attributes[attrid].name - ) - else: - real_cluster.unsupported_attributes.add(unsupported_attr) + + for cluster_type in ("in_clusters", "out_clusters"): + for cluster in ep[cluster_type]: + real_cluster = getattr(endpoint, cluster_type)[ + int(cluster["cluster_id"], 16) + ] + + if patch_cluster: + patch_cluster_for_testing(real_cluster) + + for attr in cluster["attributes"]: + attrid = int(attr["id"], 16) + + if attr["value"] is not None: + real_cluster._attr_cache[attrid] = attr["value"] + real_cluster.PLUGGED_ATTR_READS[attrid] = attr["value"] + + if attr["unsupported"]: + real_cluster.unsupported_attributes.add(attrid) + + if attr["name"] is not None: + real_cluster.unsupported_attributes.add(attr["name"]) for obj in device_data["neighbors"]: - app.topology.neighbors[ieee].append( + app.topology.neighbors[device.ieee].append( zdo_t.Neighbor( device_type=zdo_t.Neighbor.DeviceType[obj["device_type"]], rx_on_when_idle=zdo_t.Neighbor.RxOnWhenIdle[obj["rx_on_when_idle"]], @@ -450,7 +436,7 @@ def zigpy_device_from_device_data( ) for obj in device_data["routes"]: - app.topology.routes[ieee].append( + app.topology.routes[device.ieee].append( zdo_t.Route( DstNWK=t.NWK.convert(obj["dest_nwk"][2:]), RouteStatus=zdo_t.RouteStatus[obj["route_status"]],