|
| 1 | +"""Base entity platform for Pandora Car Alarm system component.""" |
| 2 | + |
1 | 3 | import asyncio
|
2 | 4 | import dataclasses
|
3 | 5 | import logging
|
|
23 | 25 | from homeassistant.config_entries import ConfigEntry
|
24 | 26 | from homeassistant.const import CONF_USERNAME, ATTR_DEVICE_ID
|
25 | 27 | from homeassistant.core import HomeAssistant, callback, Event
|
| 28 | +from homeassistant.exceptions import HomeAssistantError |
26 | 29 | from homeassistant.helpers.entity import EntityDescription, DeviceInfo, Entity
|
27 | 30 | from homeassistant.helpers.entity_platform import (
|
28 | 31 | AddEntitiesCallback,
|
|
38 | 41 | ATTR_COMMAND_ID,
|
39 | 42 | CONF_OFFLINE_AS_UNAVAILABLE,
|
40 | 43 | DEFAULT_WAITER_TIMEOUT,
|
| 44 | + EVENT_TYPE_COMMAND, |
41 | 45 | )
|
42 | 46 | from pandora_cas.device import PandoraOnlineDevice
|
43 | 47 | from pandora_cas.enums import PandoraDeviceTypes, CommandID, Features
|
|
47 | 51 |
|
48 | 52 | _LOGGER: Final = logging.getLogger(__name__)
|
49 | 53 |
|
| 54 | +DeviceValidator = Callable[[PandoraOnlineDevice], bool] |
| 55 | +CommandType = CommandID | int | Callable[[PandoraOnlineDevice], Awaitable] |
| 56 | +CommandOptions = CommandType | Sequence[tuple[DeviceValidator | None, CommandType]] |
| 57 | + |
| 58 | + |
| 59 | +# noinspection PyShadowingNames |
| 60 | +def check_for_device( |
| 61 | + has_features: Features | None = None, |
| 62 | + not_features: Features | None = None, |
| 63 | + has_device_type: str | tuple[str, ...] | None = None, |
| 64 | +) -> DeviceValidator: |
| 65 | + """ |
| 66 | + Creates a checker that checks something is suitable for device. |
| 67 | + :param has_features: Has specified features. |
| 68 | + :param not_features: |
| 69 | + :param has_device_type: |
| 70 | + :return: |
| 71 | + """ |
| 72 | + if has_features is None and not_features is None and has_device_type is None: |
| 73 | + raise ValueError( |
| 74 | + "You must specify either has_features or not_features or has_device_type" |
| 75 | + ) |
| 76 | + |
| 77 | + if isinstance(has_device_type, str): |
| 78 | + has_device_type = (has_device_type,) |
| 79 | + |
| 80 | + def _perform_check_on_device(device: PandoraOnlineDevice) -> bool: |
| 81 | + if has_device_type is not None: |
| 82 | + if device.type not in has_device_type: |
| 83 | + return False |
| 84 | + if not (has_features is None and not_features is None): |
| 85 | + if (features := device.features) is None: |
| 86 | + return False |
| 87 | + if not (has_features is None or has_features & features): |
| 88 | + return False |
| 89 | + if not (not_features is None or not (not_features & features)): |
| 90 | + return False |
| 91 | + return True |
| 92 | + |
| 93 | + return _perform_check_on_device |
| 94 | + |
| 95 | + |
| 96 | +def has_features(features: Features) -> DeviceValidator: |
| 97 | + """Shortcut for has_features argument.""" |
| 98 | + return check_for_device(has_features=features) |
| 99 | + |
| 100 | + |
| 101 | +def not_features(features: Features) -> DeviceValidator: |
| 102 | + """Shortcut for not_features argument.""" |
| 103 | + return check_for_device(not_features=features) |
| 104 | + |
| 105 | + |
| 106 | +def has_device_type(device_type: str | tuple[str, ...]) -> DeviceValidator: |
| 107 | + """Shortcut for has_device_type argument.""" |
| 108 | + return check_for_device(has_device_type=device_type) |
| 109 | + |
50 | 110 |
|
51 |
| -def parse_description_command_id(value: Any, device_type: str | None = None) -> int: |
| 111 | +def resolve_command_id( |
| 112 | + device: PandoraOnlineDevice, value: Any, raise_exceptions: bool = False |
| 113 | +) -> int | None: |
52 | 114 | """Retrieve command from definition."""
|
53 | 115 | if value is None:
|
54 |
| - raise NotImplementedError("command not defined") |
| 116 | + return None |
55 | 117 |
|
56 |
| - if isinstance(value, Mapping): |
57 |
| - try: |
58 |
| - value = value[device_type] |
59 |
| - except KeyError: |
60 |
| - if device_type is None: |
61 |
| - raise NotImplementedError("command not defined") |
| 118 | + if isinstance(value, list): |
| 119 | + for condition, command in value: |
| 120 | + if condition is None: |
| 121 | + return command |
62 | 122 | try:
|
63 |
| - value = value[None] |
64 |
| - except KeyError: |
65 |
| - raise NotImplementedError("command not defined") |
66 |
| - |
67 |
| - return value if callable(value) else int(value) |
| 123 | + if condition(device): |
| 124 | + return command |
| 125 | + except BaseException as exc: |
| 126 | + if raise_exceptions: |
| 127 | + raise |
| 128 | + _LOGGER.warning( |
| 129 | + f"Exception occurred while checking command for device {device}: {exc}", |
| 130 | + exc_info=exc, |
| 131 | + ) |
| 132 | + return None |
| 133 | + |
| 134 | + try: |
| 135 | + return value if callable(value) else int(value) |
| 136 | + except BaseException as exc: |
| 137 | + if raise_exceptions: |
| 138 | + raise |
| 139 | + _LOGGER.warning( |
| 140 | + f"Exception occurred while checking command for device {device}: {exc}", |
| 141 | + exc_info=exc, |
| 142 | + ) |
| 143 | + return None |
68 | 144 |
|
69 | 145 |
|
70 | 146 | async def async_platform_setup_entry(
|
@@ -139,10 +215,6 @@ def __post_init__(self):
|
139 | 215 | self.translation_key = self.key
|
140 | 216 |
|
141 | 217 |
|
142 |
| -CommandType = CommandID | int | Callable[[PandoraOnlineDevice], Awaitable] |
143 |
| -CommandOptions = CommandType | Mapping[str, CommandType] |
144 |
| - |
145 |
| - |
146 | 218 | class BasePandoraCASEntity(Entity):
|
147 | 219 | ENTITY_ID_FORMAT: ClassVar[str] = NotImplemented
|
148 | 220 |
|
@@ -316,14 +388,21 @@ def _process_command_response(self, event: Union[Event, datetime]) -> None:
|
316 | 388 | def _add_command_listener(self, command: CommandOptions | None) -> None:
|
317 | 389 | if command is None:
|
318 | 390 | return None
|
319 |
| - command_id = parse_description_command_id(command, self.pandora_device.type) |
| 391 | + |
| 392 | + command_id = resolve_command_id( |
| 393 | + self.pandora_device, |
| 394 | + command, |
| 395 | + raise_exceptions=False, |
| 396 | + ) |
| 397 | + |
320 | 398 | if not isinstance(command_id, int):
|
321 | 399 | return None
|
| 400 | + |
322 | 401 | if (listeners := self._command_listeners) is None:
|
323 | 402 | self._command_listeners = listeners = []
|
324 | 403 | listeners.append(
|
325 | 404 | self.hass.bus.async_listen(
|
326 |
| - event_type=f"{DOMAIN}_command", |
| 405 | + event_type=EVENT_TYPE_COMMAND, |
327 | 406 | listener=self._process_command_response,
|
328 | 407 | event_filter=callback(
|
329 | 408 | lambda x: int(x.data[ATTR_COMMAND_ID]) == command_id
|
@@ -464,15 +543,19 @@ async def run_binary_command(self, enable: bool) -> None:
|
464 | 543 | :param enable: Whether to run 'on' or 'off'.
|
465 | 544 | """
|
466 | 545 | # Determine command to run
|
467 |
| - command_id = parse_description_command_id( |
| 546 | + command_id = resolve_command_id( |
| 547 | + self.pandora_device, |
468 | 548 | (
|
469 | 549 | self.entity_description.command_on
|
470 | 550 | if enable or self.entity_description.command_off is None
|
471 | 551 | else self.entity_description.command_off
|
472 | 552 | ),
|
473 |
| - self.pandora_device.type, |
| 553 | + raise_exceptions=False, |
474 | 554 | )
|
475 | 555 |
|
| 556 | + if command_id is None: |
| 557 | + raise HomeAssistantError("could not determine command to run") |
| 558 | + |
476 | 559 | self._is_turning_on = enable
|
477 | 560 | self._is_turning_off = not enable
|
478 | 561 |
|
|
0 commit comments