diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 40fbfe84..af0400d9 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: [3.7, 3.8, 3.9, "3.10"]
+ python-version: [3.7, 3.8, 3.9, "3.10", "3.11"]
steps:
- uses: actions/checkout@v1
@@ -35,7 +35,7 @@ jobs:
strategy:
matrix:
- python-version: [3.9]
+ python-version: ["3.10"]
steps:
- uses: actions/checkout@v1
@@ -66,7 +66,7 @@ jobs:
strategy:
matrix:
- python-version: [3.9]
+ python-version: ["3.10"]
steps:
- uses: actions/checkout@v1
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 43c7a1ae..01730c6e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,13 @@ Sections
### Developers
-->
+## [4.7.0] - 2023-06-18
+
+- Allow passing multiple ip to advertise on to AccessoryDriver. [#442](https://github.com/ikalchev/HAP-python/pull/442)
+- Fix for the new home architecture - retain the original format of the UUID. [#441](https://github.com/ikalchev/HAP-python/pull/441)
+- Add python 3.11 to the CI. [#440](https://github.com/ikalchev/HAP-python/pull/440)
+- Use orjson.loads in loader to speed up startup. [#436](https://github.com/ikalchev/HAP-python/pull/436)
+
## [4.6.0] - 2022-12-10
- Patch for [WinError 5] Access Denied. [#421](https://github.com/ikalchev/HAP-python/pull/421)
diff --git a/README.md b/README.md
index d2e483ec..2b4dcf7e 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,7 @@ The project was developed for a Raspberry Pi, but it should work on other platfo
you can open `main.py` or `busy_home.py`, where you will find some fake accessories.
Just run one of them, for example `python3 busy_home.py`, and you can add it in
the Home app (be sure to be in the same network).
-Stop it by hitting Ctrl+C.
+Stop it by hitting Ctrl+C.
There are example accessories as well as integrations with real products
in [the accessories folder](accessories). See how to configure your camera in
@@ -90,7 +90,7 @@ class TemperatureSensor(Accessory):
"""
print('Temperature changed to: ', value)
- @Acessory.run_at_interval(3) # Run this method every 3 seconds
+ @Accessory.run_at_interval(3) # Run this method every 3 seconds
# The `run` method can be `async` as well
def run(self):
"""We override this method to implement what the accessory will do when it is
@@ -151,7 +151,7 @@ class Light(Accessory):
if "Brightness" in char_values:
print('Brightness changed to: ', char_values["Brightness"])
- @Acessory.run_at_interval(3) # Run this method every 3 seconds
+ @Accessory.run_at_interval(3) # Run this method every 3 seconds
# The `run` method can be `async` as well
def run(self):
"""We override this method to implement what the accessory will do when it is
diff --git a/docs/source/conf.py b/docs/source/conf.py
index c0b3be16..0e484bb4 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -40,7 +40,7 @@
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
- 'sphinx.ext.autodoc',
+ 'sphinx.ext.autodoc', 'sphinx.ext.viewcode'
]
# Add any paths that contain templates here, relative to this directory.
diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py
index 4bb5d2e5..73e2c57d 100644
--- a/pyhap/accessory_driver.py
+++ b/pyhap/accessory_driver.py
@@ -22,17 +22,17 @@
import logging
import os
import re
-import socket
import sys
import tempfile
-import time
import threading
+import time
+from typing import Optional
from zeroconf import ServiceInfo
from zeroconf.asyncio import AsyncZeroconf
from pyhap import util
-from pyhap.accessory import get_topic
+from pyhap.accessory import Accessory, get_topic
from pyhap.characteristic import CharacteristicError
from pyhap.const import (
HAP_PERMISSION_NOTIFY,
@@ -41,9 +41,9 @@
HAP_REPR_AID,
HAP_REPR_CHARS,
HAP_REPR_IID,
- HAP_REPR_TTL,
HAP_REPR_PID,
HAP_REPR_STATUS,
+ HAP_REPR_TTL,
HAP_REPR_VALUE,
STANDALONE_AID,
)
@@ -122,7 +122,7 @@ class AccessoryMDNSServiceInfo(ServiceInfo):
def __init__(self, accessory, state, zeroconf_server=None):
self.accessory = accessory
- self.state = state
+ self.state: State = state
adv_data = self._get_advert_data()
valid_name = self._valid_name()
@@ -139,7 +139,7 @@ def __init__(self, accessory, state, zeroconf_server=None):
weight=0,
priority=0,
properties=adv_data,
- addresses=[socket.inet_aton(self.state.address)],
+ parsed_addresses=self.state.addresses,
)
def _valid_name(self):
@@ -244,10 +244,10 @@ def __init__(
If not given, the value of the address parameter will be used.
:type listen_address: str
- :param advertised_address: The address of the HAPServer announced via mDNS.
+ :param advertised_address: The addresses of the HAPServer announced via mDNS.
This can be used to announce an external address from behind a NAT.
If not given, the value of the address parameter will be used.
- :type advertised_address: str
+ :type advertised_address: str | list[str]
:param interface_choice: The zeroconf interfaces to listen on.
:type InterfacesType: [InterfaceChoice.Default, InterfaceChoice.All]
@@ -279,7 +279,7 @@ def __init__(
self.loop = loop
- self.accessory = None
+ self.accessory: Optional[Accessory] = None
self.advertiser = async_zeroconf_instance
self.zeroconf_server = zeroconf_server
self.interface_choice = interface_choice
@@ -366,9 +366,9 @@ async def async_start(self):
self.aio_stop_event = asyncio.Event()
logger.info(
- "Starting accessory %s on address %s, port %s.",
+ "Starting accessory %s on addresses %s, port %s.",
self.accessory.display_name,
- self.state.address,
+ self.state.addresses,
self.state.port,
)
@@ -428,7 +428,7 @@ async def async_stop(self):
logger.info(
"Stopping accessory %s on address %s, port %s.",
self.accessory.display_name,
- self.state.address,
+ self.state.addresses,
self.state.port,
)
@@ -643,7 +643,9 @@ def persist(self):
) as file_handle:
tmp_filename = file_handle.name
self.encoder.persist(file_handle, self.state)
- if os.name == 'nt': # Or `[WinError 5] Access Denied` will be raised on Windows
+ if (
+ os.name == "nt"
+ ): # Or `[WinError 5] Access Denied` will be raised on Windows
os.chmod(tmp_filename, 0o644)
os.chmod(self.persist_file, 0o644)
os.replace(tmp_filename, self.persist_file)
@@ -663,13 +665,18 @@ def load(self):
self.encoder.load_into(file_handle, self.state)
@callback
- def pair(self, client_uuid, client_public, client_permissions):
+ def pair(
+ self,
+ client_username_bytes: bytes,
+ client_public: bytes,
+ client_permissions: bytes,
+ ) -> bool:
"""Called when a client has paired with the accessory.
Persist the new accessory state.
- :param client_uuid: The client uuid.
- :type client_uuid: uuid.UUID
+ :param client_username_bytes: The client username bytes.
+ :type client_username_bytes: bytes
:param client_public: The client's public key.
:type client_public: bytes
@@ -681,9 +688,13 @@ def pair(self, client_uuid, client_public, client_permissions):
:rtype: bool
"""
logger.info(
- "Paired with %s with permissions %s.", client_uuid, client_permissions
+ "Paired with %s with permissions %s.",
+ client_username_bytes,
+ client_permissions,
+ )
+ self.state.add_paired_client(
+ client_username_bytes, client_public, client_permissions
)
- self.state.add_paired_client(client_uuid, client_public, client_permissions)
self.async_persist()
return True
diff --git a/pyhap/const.py b/pyhap/const.py
index 66388893..562a805a 100644
--- a/pyhap/const.py
+++ b/pyhap/const.py
@@ -1,6 +1,6 @@
"""This module contains constants used by other modules."""
MAJOR_VERSION = 4
-MINOR_VERSION = 6
+MINOR_VERSION = 7
PATCH_VERSION = 0
__short_version__ = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__ = f"{__short_version__}.{PATCH_VERSION}"
diff --git a/pyhap/encoder.py b/pyhap/encoder.py
index a6a1a0a0..48b00464 100644
--- a/pyhap/encoder.py
+++ b/pyhap/encoder.py
@@ -10,6 +10,7 @@
from cryptography.hazmat.primitives.asymmetric import ed25519
from .const import CLIENT_PROP_PERMS
+from .state import State
class AccessoryEncoder:
@@ -45,7 +46,7 @@ class AccessoryEncoder:
"""
@staticmethod
- def persist(fp, state):
+ def persist(fp, state: State):
"""Persist the state of the given Accessory to the given file object.
Persists:
@@ -61,12 +62,16 @@ def persist(fp, state):
client_properties = {
str(client): props for client, props in state.client_properties.items()
}
+ client_uuid_to_bytes = {
+ str(client): bytes.hex(key) for client, key in state.uuid_to_bytes.items()
+ }
config_state = {
"mac": state.mac,
"config_version": state.config_version,
"paired_clients": paired_clients,
"client_properties": client_properties,
"accessories_hash": state.accessories_hash,
+ "client_uuid_to_bytes": client_uuid_to_bytes,
"private_key": bytes.hex(
state.private_key.private_bytes(
encoding=serialization.Encoding.Raw,
@@ -84,7 +89,7 @@ def persist(fp, state):
json.dump(config_state, fp)
@staticmethod
- def load_into(fp, state):
+ def load_into(fp, state: State) -> None:
"""Load the accessory state from the given file object into the given Accessory.
@see: AccessoryEncoder.persist
@@ -115,3 +120,7 @@ def load_into(fp, state):
state.public_key = ed25519.Ed25519PublicKey.from_public_bytes(
bytes.fromhex(loaded["public_key"])
)
+ state.uuid_to_bytes = {
+ uuid.UUID(client): bytes.fromhex(key)
+ for client, key in loaded.get("client_uuid_to_bytes", {}).items()
+ }
diff --git a/pyhap/hap_handler.py b/pyhap/hap_handler.py
index f71a5f91..80a09b38 100644
--- a/pyhap/hap_handler.py
+++ b/pyhap/hap_handler.py
@@ -5,16 +5,17 @@
import asyncio
from http import HTTPStatus
import logging
-from urllib.parse import parse_qs, urlparse
+from typing import TYPE_CHECKING, Dict, Optional
+from urllib.parse import ParseResult, parse_qs, urlparse
import uuid
+from chacha20poly1305_reuseable import ChaCha20Poly1305Reusable as ChaCha20Poly1305
from cryptography.exceptions import InvalidSignature, InvalidTag
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
-from chacha20poly1305_reuseable import ChaCha20Poly1305Reusable as ChaCha20Poly1305
+import h11
from pyhap import tlv
-
from pyhap.const import (
CATEGORY_BRIDGE,
HAP_PERMISSIONS,
@@ -25,7 +26,11 @@
from pyhap.util import long_to_bytes
from .hap_crypto import hap_hkdf, pad_tls_nonce
-from .util import to_hap_json, from_hap_json
+from .state import State
+from .util import from_hap_json, to_hap_json
+
+if TYPE_CHECKING:
+ from .accessory_driver import AccessoryDriver
# iOS will terminate the connection if it does not respond within
# 10 seconds, so we only allow 9 seconds to avoid this.
@@ -134,24 +139,29 @@ def __init__(self, accessory_handler, client_address):
@param accessory_handler: An object that controls an accessory's state.
@type accessory_handler: AccessoryDriver
"""
- self.accessory_handler = accessory_handler
- self.state = self.accessory_handler.state
+ self.accessory_handler: AccessoryDriver = accessory_handler
+ self.state: State = self.accessory_handler.state
self.enc_context = None
self.client_address = client_address
self.is_encrypted = False
- self.client_uuid = None
+ self.client_uuid: Optional[uuid.UUID] = None
- self.path = None
- self.command = None
- self.headers = None
- self.request_body = None
- self.parsed_url = None
+ self.path: Optional[str] = None
+ self.command: Optional[str] = None
+ self.headers: Optional[Dict[str, str]] = None
+ self.request_body: Optional[bytes] = None
+ self.parsed_url: Optional[ParseResult] = None
- self.response = None
+ self.response: Optional[HAPResponse] = None
def _set_encryption_ctx(
- self, client_public, private_key, public_key, shared_key, pre_session_key
- ):
+ self,
+ client_public: bytes,
+ private_key: x25519.X25519PrivateKey,
+ public_key: x25519.X25519PublicKey,
+ shared_key: bytes,
+ pre_session_key: bytes,
+ ) -> None:
"""Sets the encryption context.
The encryption context is generated in pair verify step one and is used to
@@ -178,23 +188,28 @@ def _set_encryption_ctx(
"pre_session_key": pre_session_key,
}
- def send_response(self, http_status):
+ def send_response(self, http_status: HTTPStatus) -> None:
"""Add the response header to the headers buffer and log the
response code.
Does not add Server or Date
"""
+ assert self.response is not None # nosec
self.response.status_code = http_status.value
self.response.reason = http_status.phrase
- def send_header(self, header, value):
+ def send_header(self, header: str, value: str) -> None:
"""Add the response header to the headers buffer."""
+ assert self.response is not None # nosec
self.response.headers.append((header, value))
- def end_response(self, bytesdata):
+ def end_response(self, bytesdata: bytes) -> None:
"""Combines adding a length header and actually sending the data."""
+ assert self.response is not None # nosec
self.response.body = bytesdata
- def dispatch(self, request, body=None):
+ def dispatch(
+ self, request: h11.Request, body: Optional[bytes] = None
+ ) -> HAPResponse:
"""Dispatch the request to the appropriate handler method."""
self.path = request.target.decode()
self.command = request.method.decode()
@@ -231,7 +246,7 @@ def dispatch(self, request, body=None):
self.response = None
return response
- def generic_failure_response(self):
+ def generic_failure_response(self) -> HAPResponse:
"""Generate a generic failure response."""
self.response = HAPResponse()
self.send_response_with_status(
@@ -242,13 +257,15 @@ def generic_failure_response(self):
self.response = None
return response
- def send_response_with_status(self, http_code, hap_server_status):
+ def send_response_with_status(
+ self, http_code: HTTPStatus, hap_server_status: int
+ ) -> None:
"""Send a generic HAP status response."""
self.send_response(http_code)
self.send_header("Content-Type", self.JSON_RESPONSE_TYPE)
self.end_response(to_hap_json({"status": hap_server_status}))
- def handle_pairing(self):
+ def handle_pairing(self) -> None:
"""Handles arbitrary step of the pairing process."""
if self.state.paired:
self._send_tlv_pairing_response(
@@ -271,7 +288,7 @@ def handle_pairing(self):
elif sequence == HAP_TLV_STATES.M5:
self._pairing_three(tlv_objects)
- def _pairing_one(self):
+ def _pairing_one(self) -> None:
"""Send the SRP salt and public key to the client.
The SRP verifier is created at this step.
@@ -290,7 +307,7 @@ def _pairing_one(self):
)
self._send_tlv_pairing_response(data)
- def _pairing_two(self, tlv_objects):
+ def _pairing_two(self, tlv_objects: Dict[bytes, bytes]) -> None:
"""Obtain the challenge from the client (A) and client's proof that it
knows the password (M). Verify M and generate the server's proof based on
A (H_AMK). Send the H_AMK to the client.
@@ -318,7 +335,7 @@ def _pairing_two(self, tlv_objects):
)
self._send_tlv_pairing_response(data)
- def _pairing_three(self, tlv_objects):
+ def _pairing_three(self, tlv_objects: Dict[bytes, bytes]) -> None:
"""Expand the SRP session key to obtain a new key. Use it to verify and decrypt
the recieved data. Continue to step four.
@@ -343,13 +360,21 @@ def _pairing_three(self, tlv_objects):
return
dec_tlv_objects = tlv.decode(bytes(decrypted_data))
- client_username = dec_tlv_objects[HAP_TLV_TAGS.USERNAME]
+ client_username_bytes = dec_tlv_objects[HAP_TLV_TAGS.USERNAME]
client_ltpk = dec_tlv_objects[HAP_TLV_TAGS.PUBLIC_KEY]
client_proof = dec_tlv_objects[HAP_TLV_TAGS.PROOF]
- self._pairing_four(client_username, client_ltpk, client_proof, hkdf_enc_key)
+ self._pairing_four(
+ client_username_bytes, client_ltpk, client_proof, hkdf_enc_key
+ )
- def _pairing_four(self, client_username, client_ltpk, client_proof, encryption_key):
+ def _pairing_four(
+ self,
+ client_username_bytes: bytes,
+ client_ltpk: bytes,
+ client_proof: bytes,
+ encryption_key: bytes,
+ ) -> None:
"""Expand the SRP session key to obtain a new key.
Use it to verify that the client's proof of the private key. Continue to
step five.
@@ -372,7 +397,7 @@ def _pairing_four(self, client_username, client_ltpk, client_proof, encryption_k
long_to_bytes(session_key), self.PAIRING_4_SALT, self.PAIRING_4_INFO
)
- data = output_key + client_username + client_ltpk
+ data = output_key + client_username_bytes + client_ltpk
verifying_key = ed25519.Ed25519PublicKey.from_public_bytes(client_ltpk)
try:
@@ -381,9 +406,11 @@ def _pairing_four(self, client_username, client_ltpk, client_proof, encryption_k
logger.error("Bad signature, abort.")
raise
- self._pairing_five(client_username, client_ltpk, encryption_key)
+ self._pairing_five(client_username_bytes, client_ltpk, encryption_key)
- def _pairing_five(self, client_username, client_ltpk, encryption_key):
+ def _pairing_five(
+ self, client_username_bytes: bytes, client_ltpk: bytes, encryption_key: bytes
+ ) -> None:
"""At that point we know the client has the accessory password and has a valid key
pair. Add it as a pair and send a sever proof.
@@ -417,13 +444,13 @@ def _pairing_five(self, client_username, client_ltpk, encryption_key):
cipher = ChaCha20Poly1305(encryption_key)
aead_message = bytes(cipher.encrypt(self.PAIRING_5_NONCE, bytes(message), b""))
- client_username_str = str(client_username, "utf-8")
+ client_username_str = client_username_bytes.decode("utf-8")
client_uuid = uuid.UUID(client_username_str)
logger.debug(
"Finishing pairing with admin %s uuid=%s", client_username_str, client_uuid
)
should_confirm = self.accessory_handler.pair(
- client_uuid, client_ltpk, HAP_PERMISSIONS.ADMIN
+ client_username_bytes, client_ltpk, HAP_PERMISSIONS.ADMIN
)
if not should_confirm:
@@ -439,10 +466,11 @@ def _pairing_five(self, client_username, client_ltpk, encryption_key):
HAP_TLV_TAGS.ENCRYPTED_DATA,
aead_message,
)
+ assert self.response is not None # nosec
self.response.pairing_changed = True
self._send_tlv_pairing_response(tlv_data)
- def handle_pair_verify(self):
+ def handle_pair_verify(self) -> None:
"""Handles arbitrary step of the pair verify process.
Pair verify is session negotiation.
@@ -462,14 +490,14 @@ def handle_pair_verify(self):
f"Unknown pairing sequence of {sequence} during pair verify"
)
- def _pair_verify_one(self, tlv_objects):
+ def _pair_verify_one(self, tlv_objects: Dict[bytes, bytes]) -> None:
"""Generate new session key pair and send a proof to the client.
@param tlv_objects: The TLV data received from the client.
@type tlv_object: dict
"""
logger.debug("%s: Pair verify [1/2].", self.client_address)
- client_public = tlv_objects[HAP_TLV_TAGS.PUBLIC_KEY]
+ client_public: bytes = tlv_objects[HAP_TLV_TAGS.PUBLIC_KEY]
private_key = x25519.X25519PrivateKey.generate()
public_key = private_key.public_key()
@@ -513,7 +541,7 @@ def _pair_verify_one(self, tlv_objects):
)
self._send_tlv_pairing_response(data)
- def _pair_verify_two(self, tlv_objects):
+ def _pair_verify_two(self, tlv_objects: Dict[bytes, bytes]) -> None:
"""Verify the client proof and upgrade to encrypted transport.
@param tlv_objects: The TLV data received from the client.
@@ -545,9 +573,10 @@ def _pair_verify_two(self, tlv_objects):
perm_client_public = self.state.paired_clients.get(client_uuid)
if perm_client_public is None:
logger.error(
- "%s: Client %s attempted pair verify without being paired to %s first.",
+ "%s: Client %s with uuid %s attempted pair verify without being paired first (paired clients=%s).",
self.client_address,
client_uuid,
+ self.state.paired_clients,
self.accessory_handler.accessory.display_name,
)
self._send_authentication_error_tlv_response(HAP_TLV_STATES.M4)
@@ -569,12 +598,13 @@ def _pair_verify_two(self, tlv_objects):
data = tlv.encode(HAP_TLV_TAGS.SEQUENCE_NUM, HAP_TLV_STATES.M4)
self._send_tlv_pairing_response(data)
+ assert self.response is not None # nosec
self.response.shared_key = self.enc_context["shared_key"]
self.is_encrypted = True
self.client_uuid = client_uuid
del self.enc_context
- def handle_accessories(self):
+ def handle_accessories(self) -> None:
"""Handles a client request to get the accessories."""
if not self.is_encrypted:
raise UnprivilegedRequestException
@@ -584,12 +614,13 @@ def handle_accessories(self):
self.send_header("Content-Type", self.JSON_RESPONSE_TYPE)
self.end_response(to_hap_json(hap_rep))
- def handle_get_characteristics(self):
+ def handle_get_characteristics(self) -> None:
"""Handles a client request to get certain characteristics."""
if not self.is_encrypted:
raise UnprivilegedRequestException
# Check that char exists and ...
+ assert self.parsed_url is not None # nosec
params = parse_qs(self.parsed_url.query)
response = self.accessory_handler.get_characteristics(
params["id"][0].split(",")
@@ -609,7 +640,7 @@ def handle_get_characteristics(self):
self.send_header("Content-Type", self.JSON_RESPONSE_TYPE)
self.end_response(to_hap_json(response))
- def handle_set_characteristics(self):
+ def handle_set_characteristics(self) -> None:
"""Handles a client request to update certain characteristics."""
if not self.is_encrypted:
logger.warning(
@@ -618,6 +649,7 @@ def handle_set_characteristics(self):
self.send_response(HTTPStatus.UNAUTHORIZED)
return
+ assert self.request_body is not None # nosec
requested_chars = from_hap_json(self.request_body.decode("utf-8"))
logger.debug(
"%s: Set characteristics content: %s", self.client_address, requested_chars
@@ -651,9 +683,10 @@ def handle_prepare(self):
self.send_header("Content-Type", self.JSON_RESPONSE_TYPE)
self.end_response(to_hap_json(response))
- def handle_pairings(self):
+ def handle_pairings(self) -> None:
"""Handles a client request to update or remove a pairing."""
# Must be an admin to handle pairings
+ assert self.client_uuid is not None # nosec
if not self.is_encrypted or not self.state.is_admin(self.client_uuid):
self._send_authentication_error_tlv_response(HAP_TLV_STATES.M2)
return
@@ -673,20 +706,17 @@ def handle_pairings(self):
def _handle_add_pairing(self, tlv_objects):
"""Update client information."""
- client_username = tlv_objects[HAP_TLV_TAGS.USERNAME]
- client_username_str = str(client_username, "utf-8")
+ client_username_bytes = tlv_objects[HAP_TLV_TAGS.USERNAME]
client_public = tlv_objects[HAP_TLV_TAGS.PUBLIC_KEY]
permissions = tlv_objects[HAP_TLV_TAGS.PERMISSIONS]
- client_uuid = uuid.UUID(client_username_str)
logger.debug(
- "%s: Adding client pairing for %s uuid=%s with permissions %s.",
+ "%s: Adding client pairing for %s with permissions %s.",
self.client_address,
- client_username_str,
- client_uuid,
+ client_username_bytes,
permissions,
)
should_confirm = self.accessory_handler.pair(
- client_uuid, client_public, permissions
+ client_username_bytes, client_public, permissions
)
if not should_confirm:
self._send_authentication_error_tlv_response(HAP_TLV_STATES.M2)
@@ -695,10 +725,10 @@ def _handle_add_pairing(self, tlv_objects):
data = tlv.encode(HAP_TLV_TAGS.SEQUENCE_NUM, HAP_TLV_STATES.M2)
self._send_tlv_pairing_response(data)
- def _handle_remove_pairing(self, tlv_objects):
+ def _handle_remove_pairing(self, tlv_objects: Dict[bytes, bytes]) -> None:
"""Remove pairing with the client."""
- client_username = tlv_objects[HAP_TLV_TAGS.USERNAME]
- client_username_str = str(client_username, "utf-8")
+ client_username_bytes: bytes = tlv_objects[HAP_TLV_TAGS.USERNAME]
+ client_username_str = client_username_bytes.decode("utf-8")
client_uuid = uuid.UUID(client_username_str)
was_paired = self.state.paired
logger.debug(
@@ -721,21 +751,25 @@ def _handle_remove_pairing(self, tlv_objects):
# client is removed, otherwise the controller
# may not remove them all
logger.debug("%s: updating mdns to unpaired", self.client_address)
+ assert self.response is not None # nosec
self.response.pairing_changed = True
- def _handle_list_pairings(self):
+ def _handle_list_pairings(self) -> None:
"""List current pairings."""
logger.debug("%s: list pairings", self.client_address)
response = [HAP_TLV_TAGS.SEQUENCE_NUM, HAP_TLV_STATES.M2]
- for client_uuid, client_public in self.state.paired_clients.items():
+ state = self.state
+ for client_uuid, client_public in state.paired_clients.items():
admin = self.state.is_admin(client_uuid)
response.extend(
[
HAP_TLV_TAGS.USERNAME,
# iOS 16+ requires the username to be uppercase
# or it will unpair the accessory because it thinks
- # the username is invalid
- str(client_uuid).encode("utf-8").upper(),
+ # the username is invalid. We try to send back the
+ # exact bytes that was used to pair if we have it
+ state.uuid_to_bytes.get(client_uuid)
+ or str(client_uuid).encode("utf-8").upper(),
HAP_TLV_TAGS.PUBLIC_KEY,
client_public,
HAP_TLV_TAGS.PERMISSIONS,
@@ -746,7 +780,7 @@ def _handle_list_pairings(self):
data = tlv.encode(*response)
self._send_tlv_pairing_response(data)
- def _send_authentication_error_tlv_response(self, sequence):
+ def _send_authentication_error_tlv_response(self, sequence: bytes) -> None:
"""Send an authentication error tlv response."""
self._send_tlv_pairing_response(
tlv.encode(
@@ -757,14 +791,15 @@ def _send_authentication_error_tlv_response(self, sequence):
)
)
- def _send_tlv_pairing_response(self, data):
+ def _send_tlv_pairing_response(self, data: bytes) -> None:
"""Send a TLV encoded pairing response."""
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", self.PAIRING_RESPONSE_TYPE)
self.end_response(data)
- def handle_resource(self):
+ def handle_resource(self) -> None:
"""Get a snapshot from the camera."""
+ assert self.request_body is not None # nosec
data = from_hap_json(self.request_body.decode("utf-8"))
if self.accessory_handler.accessory.category == CATEGORY_BRIDGE:
@@ -788,4 +823,5 @@ def handle_resource(self):
task = asyncio.ensure_future(asyncio.wait_for(coro, RESPONSE_TIMEOUT))
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "image/jpeg")
+ assert self.response is not None # nosec
self.response.task = task
diff --git a/pyhap/loader.py b/pyhap/loader.py
index 1883e188..3874db13 100644
--- a/pyhap/loader.py
+++ b/pyhap/loader.py
@@ -7,9 +7,10 @@
instance of it (as long as it is described in some
json file).
"""
-import json
import logging
+import orjson
+
from pyhap import CHARACTERISTICS_FILE, SERVICES_FILE
from pyhap.characteristic import Characteristic
from pyhap.service import Service
@@ -34,7 +35,7 @@ def __init__(self, path_char=CHARACTERISTICS_FILE, path_service=SERVICES_FILE):
def _read_file(path):
"""Read file and return a dict."""
with open(path, "r", encoding="utf8") as file:
- return json.load(file)
+ return orjson.loads(file.read()) # pylint: disable=no-member
def get_char(self, name):
"""Return new Characteristic object."""
diff --git a/pyhap/state.py b/pyhap/state.py
index c44dc4f7..70eab877 100644
--- a/pyhap/state.py
+++ b/pyhap/state.py
@@ -1,4 +1,7 @@
"""Module for `State` class."""
+from typing import Dict, List, Optional, Union
+from uuid import UUID
+
from cryptography.hazmat.primitives.asymmetric import ed25519
from pyhap import util
@@ -18,12 +21,27 @@ class State:
That includes all needed for setup of driver and pairing.
"""
- def __init__(self, *, address=None, mac=None, pincode=None, port=None):
+ addreses: List[str]
+
+ def __init__(
+ self,
+ *,
+ address: Optional[Union[str, List[str]]] = None,
+ mac=None,
+ pincode=None,
+ port=None
+ ):
"""Initialize a new object. Create key pair.
Must be called with keyword arguments.
"""
- self.address = address or util.get_local_address()
+ if address:
+ if isinstance(address, str):
+ self.addresses = [address]
+ else:
+ self.addresses = address
+ else:
+ self.addresses = [util.get_local_address()]
self.mac = mac or util.generate_mac()
self.pincode = pincode or util.generate_pincode()
self.port = port or DEFAULT_PORT
@@ -35,34 +53,45 @@ def __init__(self, *, address=None, mac=None, pincode=None, port=None):
self.private_key = ed25519.Ed25519PrivateKey.generate()
self.public_key = self.private_key.public_key()
+ self.uuid_to_bytes: Dict[UUID, bytes] = {}
self.accessories_hash = None
+ @property
+ def address(self) -> str:
+ """Return the first address for backwards compat."""
+ return self.addresses[0]
+
# ### Pairing ###
@property
- def paired(self):
+ def paired(self) -> bool:
"""Return if main accessory is currently paired."""
return len(self.paired_clients) > 0
- def is_admin(self, client_uuid):
+ def is_admin(self, client_uuid: UUID) -> bool:
"""Check if a paired client is an admin."""
if client_uuid not in self.client_properties:
return False
return bool(self.client_properties[client_uuid][CLIENT_PROP_PERMS] & ADMIN_BIT)
- def add_paired_client(self, client_uuid, client_public, perms):
+ def add_paired_client(
+ self, client_username_bytes: bytes, client_public: bytes, perms: bytes
+ ) -> None:
"""Add a given client to dictionary of paired clients.
- :param client_uuid: The client's UUID.
- :type client_uuid: uuid.UUID
+ :param client_username_bytes: The client's user id bytes.
+ :type client_username_bytes: bytes
:param client_public: The client's public key
(not the session public key).
:type client_public: bytes
"""
+ client_username_str = client_username_bytes.decode("utf-8")
+ client_uuid = UUID(client_username_str)
+ self.uuid_to_bytes[client_uuid] = client_username_bytes
self.paired_clients[client_uuid] = client_public
self.client_properties[client_uuid] = {CLIENT_PROP_PERMS: ord(perms)}
- def remove_paired_client(self, client_uuid):
+ def remove_paired_client(self, client_uuid: UUID) -> None:
"""Remove a given client from dictionary of paired clients.
:param client_uuid: The client's UUID.
@@ -70,6 +99,7 @@ def remove_paired_client(self, client_uuid):
"""
self.paired_clients.pop(client_uuid)
self.client_properties.pop(client_uuid)
+ self.uuid_to_bytes.pop(client_uuid, None)
# All pairings must be removed when the last admin is removed
if not any(self.is_admin(client_uuid) for client_uuid in self.paired_clients):
diff --git a/pyhap/util.py b/pyhap/util.py
index 0b11e9f4..0c40488a 100644
--- a/pyhap/util.py
+++ b/pyhap/util.py
@@ -35,7 +35,7 @@ def iscoro(func):
return asyncio.iscoroutinefunction(func)
-def get_local_address():
+def get_local_address() -> str:
"""
Grabs the local IP address using a socket.
@@ -49,7 +49,7 @@ def get_local_address():
addr = s.getsockname()[0]
finally:
s.close()
- return addr
+ return str(addr)
def long_to_bytes(n):
diff --git a/tests/conftest.py b/tests/conftest.py
index 29d05c51..37bc86f9 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -41,7 +41,6 @@ def driver(async_zeroconf):
), patch(
"pyhap.accessory_driver.AccessoryDriver.persist"
):
-
yield AccessoryDriver(loop=loop)
diff --git a/tests/test_accessory_driver.py b/tests/test_accessory_driver.py
index 4588ac03..de656984 100644
--- a/tests/test_accessory_driver.py
+++ b/tests/test_accessory_driver.py
@@ -78,14 +78,14 @@ def available(self):
return False
-def test_auto_add_aid_mac(driver):
+def test_auto_add_aid_mac(driver: AccessoryDriver):
acc = Accessory(driver, "Test Accessory")
driver.add_accessory(acc)
assert acc.aid == STANDALONE_AID
assert driver.state.mac is not None
-def test_not_standalone_aid(driver):
+def test_not_standalone_aid(driver: AccessoryDriver):
acc = Accessory(driver, "Test Accessory", aid=STANDALONE_AID + 1)
with pytest.raises(ValueError):
driver.add_accessory(acc)
@@ -128,7 +128,21 @@ def test_external_zeroconf():
assert driver.advertiser == zeroconf
-def test_service_callbacks(driver):
+def test_advertised_address():
+ zeroconf = MagicMock()
+ with patch("pyhap.accessory_driver.HAPServer"), patch(
+ "pyhap.accessory_driver.AccessoryDriver.persist"
+ ):
+ driver = AccessoryDriver(
+ port=51234,
+ async_zeroconf_instance=zeroconf,
+ advertised_address=["1.2.3.4", "::1"],
+ )
+ assert driver.advertiser == zeroconf
+ assert driver.state.addresses == ["1.2.3.4", "::1"]
+
+
+def test_service_callbacks(driver: AccessoryDriver):
bridge = Bridge(driver, "mybridge")
acc = Accessory(driver, "TestAcc", aid=2)
acc2 = UnavailableAccessory(driver, "TestAcc2", aid=3)
@@ -230,7 +244,7 @@ def _fail_func():
}
-def test_service_callbacks_partial_failure(driver):
+def test_service_callbacks_partial_failure(driver: AccessoryDriver):
bridge = Bridge(driver, "mybridge")
acc = Accessory(driver, "TestAcc", aid=2)
acc2 = UnavailableAccessory(driver, "TestAcc2", aid=3)
@@ -418,7 +432,7 @@ def fail_callback(*_):
}
-def test_start_from_sync(driver):
+def test_start_from_sync(driver: AccessoryDriver):
"""Start from sync."""
class Acc(Accessory):
@@ -436,7 +450,7 @@ def setup_message(self):
driver.start()
-def test_accessory_level_callbacks(driver):
+def test_accessory_level_callbacks(driver: AccessoryDriver):
bridge = Bridge(driver, "mybridge")
acc = Accessory(driver, "TestAcc", aid=2)
acc2 = UnavailableAccessory(driver, "TestAcc2", aid=3)
@@ -525,7 +539,7 @@ def test_accessory_level_callbacks(driver):
)
-def test_accessory_level_callbacks_with_a_failure(driver):
+def test_accessory_level_callbacks_with_a_failure(driver: AccessoryDriver):
bridge = Bridge(driver, "mybridge")
acc = Accessory(driver, "TestAcc", aid=2)
acc2 = UnavailableAccessory(driver, "TestAcc2", aid=3)
@@ -769,13 +783,13 @@ def setup_message(self):
await driver.aio_stop_event.wait()
-def test_start_without_accessory(driver):
+def test_start_without_accessory(driver: AccessoryDriver):
"""Verify we throw ValueError if there is no accessory."""
with pytest.raises(ValueError):
driver.start_service()
-def test_send_events(driver):
+def test_send_events(driver: AccessoryDriver):
"""Test we can send events."""
driver.aio_stop_event = MagicMock(is_set=MagicMock(return_value=False))
@@ -812,7 +826,7 @@ def get_pushed_events(self):
}
-def test_async_subscribe_client_topic(driver):
+def test_async_subscribe_client_topic(driver: AccessoryDriver):
"""Test subscribe and unsubscribe."""
addr_info = ("1.2.3.4", 5)
topic = "any"
@@ -825,7 +839,7 @@ def test_async_subscribe_client_topic(driver):
assert driver.topics == {}
-def test_mdns_service_info(driver):
+def test_mdns_service_info(driver: AccessoryDriver):
"""Test accessory mdns advert."""
acc = Accessory(driver, "[@@@Test@@@] Accessory")
driver.add_accessory(acc)
@@ -854,7 +868,7 @@ def test_mdns_service_info(driver):
}
-def test_mdns_service_info_with_specified_server(driver):
+def test_mdns_service_info_with_specified_server(driver: AccessoryDriver):
"""Test accessory mdns advert when the server is specified."""
acc = Accessory(driver, "Test Accessory")
driver.add_accessory(acc)
@@ -903,7 +917,9 @@ def test_mdns_service_info_with_specified_server(driver):
),
],
)
-def test_mdns_name_sanity(driver, accessory_name, mdns_name, mdns_server):
+def test_mdns_name_sanity(
+ driver: AccessoryDriver, accessory_name, mdns_name, mdns_server
+):
"""Test mdns name sanity."""
acc = Accessory(driver, accessory_name)
driver.add_accessory(acc)
diff --git a/tests/test_characteristic.py b/tests/test_characteristic.py
index 08747dda..d7e2eca5 100644
--- a/tests/test_characteristic.py
+++ b/tests/test_characteristic.py
@@ -5,15 +5,15 @@
import pytest
from pyhap.characteristic import (
+ CHAR_PROGRAMMABLE_SWITCH_EVENT,
HAP_FORMAT_DEFAULTS,
+ HAP_FORMAT_FLOAT,
HAP_FORMAT_INT,
HAP_FORMAT_UINT8,
HAP_FORMAT_UINT16,
HAP_FORMAT_UINT32,
HAP_FORMAT_UINT64,
- HAP_FORMAT_FLOAT,
HAP_PERMISSION_READ,
- CHAR_PROGRAMMABLE_SWITCH_EVENT,
Characteristic,
)
diff --git a/tests/test_encoder.py b/tests/test_encoder.py
index ff2a4a2a..1267017b 100644
--- a/tests/test_encoder.py
+++ b/tests/test_encoder.py
@@ -21,8 +21,9 @@ def test_persist_and_load():
sample_client_pk = _pk.public_key()
state = State(mac=mac)
admin_client_uuid = uuid.uuid1()
+ admin_client_bytes = str(admin_client_uuid).upper().encode("utf-8")
state.add_paired_client(
- admin_client_uuid,
+ admin_client_bytes,
sample_client_pk.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
@@ -31,8 +32,9 @@ def test_persist_and_load():
)
assert state.is_admin(admin_client_uuid)
user_client_uuid = uuid.uuid1()
+ user_client_bytes = str(user_client_uuid).upper().encode("utf-8")
state.add_paired_client(
- user_client_uuid,
+ user_client_bytes,
sample_client_pk.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
@@ -77,8 +79,9 @@ def test_migration_to_include_client_properties():
sample_client_pk = _pk.public_key()
state = State(mac=mac)
admin_client_uuid = uuid.uuid1()
+ admin_client_bytes = str(admin_client_uuid).upper().encode("utf-8")
state.add_paired_client(
- admin_client_uuid,
+ admin_client_bytes,
sample_client_pk.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
@@ -87,8 +90,9 @@ def test_migration_to_include_client_properties():
)
assert state.is_admin(admin_client_uuid)
user_client_uuid = uuid.uuid1()
+ user_client_bytes = str(user_client_uuid).upper().encode("utf-8")
state.add_paired_client(
- user_client_uuid,
+ user_client_bytes,
sample_client_pk.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
diff --git a/tests/test_hap_handler.py b/tests/test_hap_handler.py
index 7d9512ea..554f9679 100644
--- a/tests/test_hap_handler.py
+++ b/tests/test_hap_handler.py
@@ -12,9 +12,12 @@
from pyhap.accessory import Accessory, Bridge
from pyhap.characteristic import CharacteristicError
from pyhap.const import HAP_PERMISSIONS
+from pyhap.accessory_driver import AccessoryDriver
CLIENT_UUID = UUID("7d0d1ee9-46fe-4a56-a115-69df3f6860c1")
+CLIENT_UUID_BYTES = str(CLIENT_UUID).upper().encode("utf-8")
CLIENT2_UUID = UUID("7d0d1ee9-46fe-4a56-a115-69df3f6860c2")
+CLIENT2_UUID_BYTES = str(CLIENT2_UUID).upper().encode("utf-8")
PUBLIC_KEY = b"\x99\x98d%\x8c\xf6h\x06\xfa\x85\x9f\x90\x82\xf2\xe8\x18\x9f\xf8\xc75\x1f>~\xc32\xc1OC\x13\xbfH\xad"
@@ -26,13 +29,14 @@ def test_response():
assert "500" in str(response)
-def test_list_pairings_unencrypted(driver):
+def test_list_pairings_unencrypted(driver: AccessoryDriver):
"""Verify an unencrypted list pairings request fails."""
driver.add_accessory(Accessory(driver, "TestAcc"))
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ handler.client_uuid = CLIENT_UUID
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -57,7 +61,7 @@ def test_list_pairings(driver):
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -85,7 +89,7 @@ def test_add_pairing_admin(driver):
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
assert driver.state.paired is False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
response = hap_handler.HAPResponse()
handler.response = response
@@ -116,7 +120,7 @@ def test_add_pairing_user(driver):
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
assert driver.state.paired is False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
response = hap_handler.HAPResponse()
handler.response = response
@@ -189,8 +193,8 @@ def test_remove_pairing(driver):
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
- driver.pair(CLIENT2_UUID, PUBLIC_KEY, HAP_PERMISSIONS.USER)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT2_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.USER)
assert driver.state.paired is True
assert CLIENT_UUID in driver.state.paired_clients
@@ -240,7 +244,7 @@ def test_non_admin_pairings_request(driver):
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.USER)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.USER)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -264,7 +268,7 @@ def test_invalid_pairings_request(driver):
handler.is_encrypted = True
handler.client_uuid = CLIENT_UUID
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -283,7 +287,7 @@ def test_pair_verify_one(driver):
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -335,7 +339,7 @@ def test_pair_verify_two_invaild_state(driver):
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -379,7 +383,7 @@ def test_invalid_pairing_request(driver):
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
assert CLIENT_UUID in driver.state.paired_clients
response = hap_handler.HAPResponse()
@@ -666,7 +670,7 @@ def test_attempt_to_pair_when_already_paired(driver):
handler = hap_handler.HAPServerHandler(driver, "peername")
handler.is_encrypted = False
- driver.pair(CLIENT_UUID, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
+ driver.pair(CLIENT_UUID_BYTES, PUBLIC_KEY, HAP_PERMISSIONS.ADMIN)
response = hap_handler.HAPResponse()
handler.response = response
diff --git a/tests/test_hsrp.py b/tests/test_hsrp.py
index 24b93496..3c908c01 100644
--- a/tests/test_hsrp.py
+++ b/tests/test_hsrp.py
@@ -21,7 +21,6 @@
def test_srp_basic():
-
ctx = get_srp_context(3072, hashlib.sha512, 16)
b = 191304991611724068381190663629083136274
s = long_to_bytes(227710976386754876301088769828140156049)
diff --git a/tests/test_state.py b/tests/test_state.py
index 989d3b2d..b7da0723 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -1,5 +1,6 @@
"""Test for pyhap.state."""
from unittest.mock import patch
+from uuid import UUID
from cryptography.hazmat.primitives.asymmetric import ed25519
import pytest
@@ -7,6 +8,11 @@
from pyhap.const import CLIENT_PROP_PERMS, HAP_PERMISSIONS
from pyhap.state import State
+CLIENT_UUID = UUID("7d0d1ee9-46fe-4a56-a115-69df3f6860c1")
+CLIENT_UUID_BYTES = str(CLIENT_UUID).upper().encode("utf-8")
+CLIENT2_UUID = UUID("7d0d1ee9-46fe-4a56-a115-69df3f6860c2")
+CLIENT2_UUID_BYTES = str(CLIENT2_UUID).upper().encode("utf-8")
+
def test_setup():
"""Test if State class is setup correctly."""
@@ -28,7 +34,6 @@ def test_setup():
"cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate",
return_value=private_key,
) as mock_create_keypair:
-
state = State(address=addr, mac=mac, pincode=pin, port=port)
assert not mock_local_addr.called
assert not mock_gen_mac.called
@@ -37,6 +42,7 @@ def test_setup():
assert mock_create_keypair.called
assert state.address == addr
+ assert state.addresses == [addr]
assert state.mac == mac
assert state.pincode == pin
assert state.port == port
@@ -59,21 +65,25 @@ def test_pairing_remove_last_admin():
assert not state.paired
assert not state.paired_clients
- state.add_paired_client("uuid", "public", HAP_PERMISSIONS.ADMIN)
+ state.add_paired_client(CLIENT_UUID_BYTES, "public", HAP_PERMISSIONS.ADMIN)
assert state.paired
- assert state.paired_clients == {"uuid": "public"}
- assert state.client_properties == {"uuid": {CLIENT_PROP_PERMS: 1}}
+ assert state.paired_clients == {CLIENT_UUID: "public"}
+ assert state.client_properties == {CLIENT_UUID: {CLIENT_PROP_PERMS: 1}}
- state.add_paired_client("uuid2", "public", HAP_PERMISSIONS.USER)
+ state.add_paired_client(CLIENT2_UUID_BYTES, "public", HAP_PERMISSIONS.USER)
assert state.paired
- assert state.paired_clients == {"uuid": "public", "uuid2": "public"}
+ assert state.paired_clients == {CLIENT_UUID: "public", CLIENT2_UUID: "public"}
assert state.client_properties == {
- "uuid": {CLIENT_PROP_PERMS: 1},
- "uuid2": {CLIENT_PROP_PERMS: 0},
+ CLIENT_UUID: {CLIENT_PROP_PERMS: 1},
+ CLIENT2_UUID: {CLIENT_PROP_PERMS: 0},
+ }
+ assert state.uuid_to_bytes == {
+ CLIENT_UUID: CLIENT_UUID_BYTES,
+ CLIENT2_UUID: CLIENT2_UUID_BYTES,
}
# Removing the last admin should remove all non-admins
- state.remove_paired_client("uuid")
+ state.remove_paired_client(CLIENT_UUID)
assert not state.paired
assert not state.paired_clients
@@ -88,22 +98,22 @@ def test_pairing_two_admins():
assert not state.paired
assert not state.paired_clients
- state.add_paired_client("uuid", "public", HAP_PERMISSIONS.ADMIN)
+ state.add_paired_client(CLIENT_UUID_BYTES, "public", HAP_PERMISSIONS.ADMIN)
assert state.paired
- assert state.paired_clients == {"uuid": "public"}
- assert state.client_properties == {"uuid": {CLIENT_PROP_PERMS: 1}}
+ assert state.paired_clients == {CLIENT_UUID: "public"}
+ assert state.client_properties == {CLIENT_UUID: {CLIENT_PROP_PERMS: 1}}
- state.add_paired_client("uuid2", "public", HAP_PERMISSIONS.ADMIN)
+ state.add_paired_client(CLIENT2_UUID_BYTES, "public", HAP_PERMISSIONS.ADMIN)
assert state.paired
- assert state.paired_clients == {"uuid": "public", "uuid2": "public"}
+ assert state.paired_clients == {CLIENT_UUID: "public", CLIENT2_UUID: "public"}
assert state.client_properties == {
- "uuid": {CLIENT_PROP_PERMS: 1},
- "uuid2": {CLIENT_PROP_PERMS: 1},
+ CLIENT_UUID: {CLIENT_PROP_PERMS: 1},
+ CLIENT2_UUID: {CLIENT_PROP_PERMS: 1},
}
# Removing the admin should leave the other admin
- state.remove_paired_client("uuid2")
+ state.remove_paired_client(CLIENT2_UUID)
assert state.paired
- assert state.paired_clients == {"uuid": "public"}
- assert state.client_properties == {"uuid": {CLIENT_PROP_PERMS: 1}}
- assert not state.is_admin("uuid2")
+ assert state.paired_clients == {CLIENT_UUID: "public"}
+ assert state.client_properties == {CLIENT_UUID: {CLIENT_PROP_PERMS: 1}}
+ assert not state.is_admin(CLIENT2_UUID)
diff --git a/tox.ini b/tox.ini
index 4e30cbe2..1fc0e166 100644
--- a/tox.ini
+++ b/tox.ini
@@ -10,6 +10,7 @@ python =
3.8: py38, mypy
3.9: py39, mypy
3.10: py310, mypy
+ 3.11: py310, mypy
[testenv]
deps =
@@ -39,7 +40,7 @@ deps =
commands =
make clean
sphinx-build -W -b html source {envtmpdir}/html
-whitelist_externals=
+allowlist_externals=
/usr/bin/make
make
@@ -47,6 +48,7 @@ whitelist_externals=
[testenv:lint]
basepython = {env:PYTHON3_PATH:python3}
deps =
+ -r{toxinidir}/requirements_all.txt
-r{toxinidir}/requirements_test.txt
commands =
flake8 pyhap tests --ignore=D10,D205,D4,E501,E126,E128,W504,W503,E203