-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a full device scan #115
Labels
enhancement
New feature or request
Comments
Here is some initial scratch code to do this... default_response = GENERAL_COMMANDS[GeneralCommand.Default_Response].schema
async def discover_attributes(
cluster: Cluster,
) -> list[DiscoverAttributesResponseRecord]:
"""Discover attributes for a cluster."""
attribute_id: int = 0
discovery_complete: bool = False
attributes: list[DiscoverAttributesResponseRecord] = []
while not discovery_complete:
response = await cluster.discover_attributes(
start_attribute_id=attribute_id, max_attribute_ids=10
)
if isinstance(response, default_response):
_LOGGER.debug("Default response received: %s", response)
break
discovery_complete, attribute_info = response
attributes.extend(attribute_info)
for attribute in attribute_info:
_LOGGER.debug(
"Discovered attribute: %s: %s",
attribute.attrid,
attribute.datatype,
)
attribute_id = attribute.attrid + 1
return attributes
async def discover_commands(
cluster: Cluster,
):
"""Discover generated and received commands for a cluster."""
response: dict[str, list[t.uint8_t]] = {}
for command in ("discover_commands_received", "discover_commands_generated"):
command_id: int = 0
discovery_complete: bool = False
command_method = getattr(cluster, command)
command_ids_discovered: list[t.uint8_t] = []
while not discovery_complete:
command_response = await command_method(
start_command_id=command_id, max_command_ids=10
)
if isinstance(command_response, default_response):
_LOGGER.debug("Default response received: %s", command_response)
break
discovery_complete, command_ids = command_response
command_ids_discovered.extend(command_ids)
for discovered_command_id in command_ids:
_LOGGER.debug(
"Discovered received command id: %s", discovered_command_id
)
command_id = discovered_command_id + 1
response[command] = command_ids_discovered
return response
async def scan_device(service: ServiceCall) -> None:
"""Issue command on zigbee cluster on a zigbee group."""
device_id: int = service.data[ATTR_DEVICE_ID]
zha_device: Device = async_get_zha_device_proxy(hass, device_id).device
_LOGGER.debug("Scanning device: %s", zha_device.ieee)
for endpoint_id, endpoint in zha_device.endpoints.items():
if endpoint_id == 0:
continue
for cluster_collection in ("in_clusters", "out_clusters"):
for cluster_id, cluster in getattr(
endpoint.zigpy_endpoint, cluster_collection
).items():
_LOGGER.debug("Scanning cluster: %s", cluster_id)
await discover_attributes(cluster)
await discover_commands(cluster) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://github.com/mdeweerd/zha-toolkit/blob/main/custom_components/zha_toolkit/scan_device.py
We should include this functionality in ZHA. We should schedule this post device join.
The text was updated successfully, but these errors were encountered: