diff --git a/custom_components/dabpumps/api.py b/custom_components/dabpumps/api.py index a2d4006..43eb640 100644 --- a/custom_components/dabpumps/api.py +++ b/custom_components/dabpumps/api.py @@ -1,29 +1,24 @@ """api.py: DabPumps API for DAB Pumps integration.""" +import aiohttp import asyncio -import hashlib -import httpx -import json + import jwt import logging -import math import re import time -import urllib.parse -from collections import defaultdict, namedtuple -from datetime import datetime, timedelta +from collections import namedtuple +from datetime import datetime from typing import Any +from yarl import URL from homeassistant.components.diagnostics import REDACTED from homeassistant.components.diagnostics.util import async_redact_data -from homeassistant.components.sensor import SensorStateClass from homeassistant.core import HomeAssistant -from homeassistant.exceptions import IntegrationError -from homeassistant.helpers.httpx_client import create_async_httpx_client +from homeassistant.helpers.aiohttp_client import async_create_clientsession from homeassistant.helpers.storage import Store -from httpx import RequestError, TimeoutException from .const import ( DOMAIN, @@ -34,7 +29,6 @@ DABPUMPS_API_TOKEN_COOKIE, DABPUMPS_API_TOKEN_TIME_MIN, API_LOGIN, - API_CLIENT_TIMEOUT, SIMULATE_SUFFIX_ID, DIAGNOSTICS_REDACT, ) @@ -89,8 +83,8 @@ def __init__(self, hass, username, password, use_history_store=True): # Client to keep track of cookies during login and subsequent calls # We keep the same client for the whole life of the api instance. - self._client = create_async_httpx_client(self._hass, follow_redirects=True, timeout=API_CLIENT_TIMEOUT) - + self._client:aiohttp.ClientSession = async_create_clientsession(self._hass) + if use_history_store: # maintain calls history for diagnostics during normal operations self._history_key = username.lower() @@ -107,10 +101,10 @@ def __init__(self, hass, username, password, use_history_store=True): async def async_login(self): """Login to DAB Pumps by trying each of the possible login methods""" - # Step 0: do we still have a client with a non-expired auth token? - token = self._client.cookies.get(DABPUMPS_API_TOKEN_COOKIE, domain=DABPUMPS_API_DOMAIN) - if token: - token_payload = jwt.decode(jwt=token, options={"verify_signature": False}) + # Step 0: do we still have a cookie with a non-expired auth token? + cookie = self._client.cookie_jar.filter_cookies(URL(DABPUMPS_API_URL)).get(DABPUMPS_API_TOKEN_COOKIE, None) + if cookie: + token_payload = jwt.decode(jwt=cookie.value, options={"verify_signature": False}) if token_payload.get("exp", 0) - time.time() > DABPUMPS_API_TOKEN_TIME_MIN: # still valid for another 10 seconds @@ -146,6 +140,7 @@ async def async_login(self): # if we reached this point then a login method succeeded # keep using this client and its cookies and remember which method had success + _LOGGER.debug(f"DAB Pumps login succeeded using method {method}") self._login_method = method return @@ -165,104 +160,119 @@ async def async_login_dablive_app(self, isDabLive=1): # Step 1: get authorization token context = f"login DabLive_app (isDabLive={isDabLive})" - verb = "POST" - url = DABPUMPS_API_URL + f"/auth/token" - params = { - 'isDabLive': isDabLive, # required param, though actual value seems to be completely ignored - } - data = { - 'username': self._username, - 'password': self._password - } - hdrs = { - 'Content-Type': 'application/x-www-form-urlencoded' + request = { + "method": "POST", + "url": DABPUMPS_API_URL + f"/auth/token", + "params": { + 'isDabLive': isDabLive, # required param, though actual value seems to be completely ignored + }, + "headers": { + 'Content-Type': 'application/x-www-form-urlencoded', + }, + "data": { + 'username': self._username, + 'password': self._password, + }, } - _LOGGER.debug(f"DAB Pumps login for '{self._username}' via {verb} {url}") - result = await self._async_send_request(context, verb, url, params=params, data=data, hdrs=hdrs) + _LOGGER.debug(f"DAB Pumps login for '{self._username}' via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) token = result.get('access_token') or "" if not token: - error = f"No access token found in response from {url}" + error = f"No access token found in response from {request["url"]}" _LOGGER.debug(error) # logged as warning after last retry raise DabPumpsApiAuthError(error) # if we reach this point then the token was OK # Store returned access-token as cookie so it will automatically be passed in next calls - self._client.cookies.set(name=DABPUMPS_API_TOKEN_COOKIE, value=token, domain=DABPUMPS_API_DOMAIN, path='/') - + self._client.cookie_jar.update_cookies( { DABPUMPS_API_TOKEN_COOKIE: token }, URL(DABPUMPS_API_URL) ) + async def async_login_dconnect_app(self): """Login to DAB Pumps via the method as used by the DConnect app""" # Step 1: get authorization token context = f"login DConnect_app" - verb = "POST" - url = DABPUMPS_SSO_URL + f"/auth/realms/dwt-group/protocol/openid-connect/token" - data = { - 'client_id': 'DWT-Dconnect-Mobile', - 'client_secret': 'ce2713d8-4974-4e0c-a92e-8b942dffd561', - 'scope': 'openid', - 'grant_type': 'password', - 'username': self._username, - 'password': self._password - } - hdrs = { - 'Content-Type': 'application/x-www-form-urlencoded' + request = { + "method": "POST", + "url": DABPUMPS_SSO_URL + f"/auth/realms/dwt-group/protocol/openid-connect/token", + "headers": { + 'Content-Type': 'application/x-www-form-urlencoded' + }, + "data": { + 'client_id': 'DWT-Dconnect-Mobile', + 'client_secret': 'ce2713d8-4974-4e0c-a92e-8b942dffd561', + 'scope': 'openid', + 'grant_type': 'password', + 'username': self._username, + 'password': self._password + }, } - _LOGGER.debug(f"DAB Pumps login for '{self._username}' via {verb} {url}") - result = await self._async_send_request(context, verb, url, data=data, hdrs=hdrs) + _LOGGER.debug(f"DAB Pumps login for '{self._username}' via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) token = result.get('access_token') or "" if not token: - error = f"No access token found in response from {url}" + error = f"No access token found in response from {request["url"]}" _LOGGER.debug(error) # logged as warning after last retry raise DabPumpsApiAuthError(error) # Step 2: Validate the auth token against the DABPumps Api context = f"login DConnect_app validatetoken" - verb = "GET" - url = DABPUMPS_API_URL + f"/api/v1/token/validatetoken" - params = { - 'email': self._username, - 'token': token, + request = { + "method": "GET", + "url": DABPUMPS_API_URL + f"/api/v1/token/validatetoken", + "params": { + 'email': self._username, + 'token': token, + } } - _LOGGER.debug(f"DAB Pumps validate token via {verb} {url}") - result = await self._async_send_request(context, verb, url, params=params) + _LOGGER.debug(f"DAB Pumps validate token via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) # if we reach this point then the token was OK # Store returned access-token as cookie so it will automatically be passed in next calls - self._client.cookies.set(name=DABPUMPS_API_TOKEN_COOKIE, value=token, domain=DABPUMPS_API_DOMAIN, path='/') - + self._client.cookie_jar.update_cookies( { DABPUMPS_API_TOKEN_COOKIE: token }, URL(DABPUMPS_API_URL) ) + async def async_login_dconnect_web(self): """Login to DAB Pumps via the method as used by the DConnect website""" # Step 1: get login url context = f"login DConnect_web home" - verb = "GET" - url = DABPUMPS_API_URL + request = { + "method": "GET", + "url": DABPUMPS_API_URL, + } - _LOGGER.debug(f"DAB Pumps retrieve login page via GET {url}") - text = await self._async_send_request(context, verb, url) + _LOGGER.debug(f"DAB Pumps retrieve login page via GET {request["url"]}") + text = await self._async_send_request(context, request) match = re.search(r'action\s?=\s?\"(.*?)\"', text, re.MULTILINE) if not match: - error = f"Unexpected response while retrieving login url from {url}: {text}" + error = f"Unexpected response while retrieving login url from {request["url"]}: {text}" _LOGGER.debug(error) # logged as warning after last retry raise DabPumpsApiAuthError(error) - login_url = match.group(1).replace('&', '&') - login_data = {'username': self._username, 'password': self._password } - login_hdrs = {'Content-Type': 'application/x-www-form-urlencoded'} - # Step 2: Login - _LOGGER.debug(f"DAB Pumps login for '{self._username}' via POST {login_url}") context = f"login DConnect_web login" - verb = "POST" - await self._async_send_request(context, verb, login_url, data=login_data, hdrs=login_hdrs) + request = { + "method": "POST", + "url": match.group(1).replace('&', '&'), + "headers": { + 'Content-Type': 'application/x-www-form-urlencoded' + }, + "data": { + 'username': self._username, + 'password': self._password + }, + } + + _LOGGER.debug(f"DAB Pumps login for '{self._username}' via {request["method"]} {request["url"]}") + await self._async_send_request(context, request) # if we reach this point without exceptions then login was successfull # client access_token is already set by the last call @@ -271,10 +281,10 @@ async def async_login_dconnect_web(self): async def async_logout(self): """Logout from DAB Pumps""" - # Home Assistant will issue a warning when calling aclose() on the async httpx client. + # Home Assistant will issue a warning when calling aclose() on the async aiohttp client. # Instead of closing we will simply forget all cookies. The result is that on a next # request, the client will act like it is a new one. - self._client.cookies.clear() + self._client.cookie_jar.clear() async def async_fetch_install_list(self): @@ -282,11 +292,13 @@ async def async_fetch_install_list(self): timestamp = datetime.now() context = f"installation list" - verb = "GET" - url = DABPUMPS_API_URL + '/api/v1/installation' + request = { + "method": "GET", + "url": DABPUMPS_API_URL + '/api/v1/installation', + } - _LOGGER.debug(f"DAB Pumps retrieve installation list for '{self._username}' via {verb} {url}") - (result, request, response) = await self._async_send_request_ex(context, verb, url, diagnostics=False) + _LOGGER.debug(f"DAB Pumps retrieve installation list for '{self._username}' via {request["method"]} {request["url"]}") + (result, request, response) = await self._async_send_request_ex(context, request, diagnostics=False) # only update diagnostics if we actually received data # this data is then used as fallback for async_fetch_install_details @@ -303,12 +315,13 @@ async def async_fetch_install_details(self, install_id): install_id_org = install_id.removesuffix(SIMULATE_SUFFIX_ID) context = f"installation {install_id}" - verb = "GET" - url = DABPUMPS_API_URL + f"/api/v1/installation/{install_id_org}" + request = { + "method": "GET", + "url": DABPUMPS_API_URL + f"/api/v1/installation/{install_id_org}", + } - _LOGGER.debug(f"DAB Pumps retrieve installation details via {verb} {url}") - result = await self._async_send_request(context, verb, url) - + _LOGGER.debug(f"DAB Pumps retrieve installation details via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) return result @@ -325,10 +338,9 @@ async def async_fallback_install_details(self, install_id): data = await self._history_store.async_get_data() or {} installation_list = data.get("details", {}).get(context, {}) - installations = installation_list.get("response", {}).get("json", {}).get("values", []) + installations = installation_list.get("rsp", {}).get("json", {}).get("values", []) installation = next( (install for install in installations if install.get("installation_id", "") == install_id_org), {}) - return installation @@ -338,13 +350,14 @@ async def async_fetch_device_config(self, device): config_id = device.config_id context = f"configuration {config_id}" - verb = "GET" - url = DABPUMPS_API_URL + f"/api/v1/configuration/{config_id}" - # or DABPUMPS_API_URL + f"/api/v1/configure/paramsDefinition?version=0&doc={config_name}" - - _LOGGER.debug(f"DAB Pumps retrieve device config for '{device.name}' via {verb} {url}") - result = await self._async_send_request(context, verb, url) + request = { + "method": "GET", + "url": DABPUMPS_API_URL + f"/api/v1/configuration/{config_id}", + # or DABPUMPS_API_URL + f"/api/v1/configure/paramsDefinition?version=0&doc={config_name}", + } + _LOGGER.debug(f"DAB Pumps retrieve device config for '{device.name}' via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) return result @@ -354,13 +367,14 @@ async def async_fetch_device_statusses(self, device): serial = device.serial.removesuffix(SIMULATE_SUFFIX_ID) context = f"statusses {serial}" - verb = "GET" - url = DABPUMPS_API_URL + f"/dumstate/{serial}" - # or DABPUMPS_API_URL + f"/api/v1/dum/{serial}/state" - - _LOGGER.debug(f"DAB Pumps retrieve device statusses for '{device.name}' via {verb} {url}") - result = await self._async_send_request(context, verb, url) + request = { + "method": "GET", + "url": DABPUMPS_API_URL + f"/dumstate/{serial}", + # or DABPUMPS_API_URL + f"/api/v1/dum/{serial}/state", + } + _LOGGER.debug(f"DAB Pumps retrieve device statusses for '{device.name}' via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) return result @@ -370,13 +384,20 @@ async def async_change_device_status(self, status, value): serial = status.serial.removesuffix(SIMULATE_SUFFIX_ID) context = f"set {serial}:{status.key}" - verb = "POST" - url = DABPUMPS_API_URL + f"/dum/{serial}" - json = {'key': status.key, 'value': str(value) } - hdrs = {'Content-Type': 'application/json'} + request = { + "method": "POST", + "url": DABPUMPS_API_URL + f"/dum/{serial}", + "headers": { + 'Content-Type': 'application/json' + }, + "json": { + 'key': status.key, + 'value': str(value) + }, + } - _LOGGER.debug(f"DAB Pumps set device param for '{status.unique_id}' to '{value}' via {verb} {url}") - result = await self._async_send_request(context, verb, url, json=json, hdrs=hdrs) + _LOGGER.debug(f"DAB Pumps set device param for '{status.unique_id}' to '{value}' via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) # If no exception was thrown then the operation was successfull return True @@ -386,62 +407,82 @@ async def async_fetch_strings(self, lang): """Get string translations""" context = f"localization_{lang}" - verb = "GET" - url = DABPUMPS_API_URL + f"/resources/js/localization_{lang}.properties?format=JSON" + request = { + "method": "GET", + "url": DABPUMPS_API_URL + f"/resources/js/localization_{lang}.properties?format=JSON", + } - _LOGGER.debug(f"DAB Pumps retrieve language info via {verb} {url}") - result = await self._async_send_request(context, verb, url) - + _LOGGER.debug(f"DAB Pumps retrieve language info via {request["method"]} {request["url"]}") + result = await self._async_send_request(context, request) return result - async def _async_send_request(self, context, verb, url, params=None, data=None, json=None, hdrs=None): + async def _async_send_request(self, context, request): """GET or POST a request for JSON data""" - (data, _, _) = await self._async_send_request_ex(context, verb, url, params=params, data=data, json=json, hdrs=hdrs, diagnostics=True) + (data, _, _) = await self._async_send_request_ex(context, request, diagnostics=True) return data - async def _async_send_request_ex(self, context, verb, url, params=None, data=None, json=None, hdrs=None, diagnostics=True): + async def _async_send_request_ex(self, context, request, diagnostics=True): """ GET or POST a request for JSON data. Also returns the request and response performed """ + # Perform the http request timestamp = datetime.now() - request = self._client.build_request(verb, url, params=params, data=data, json=json, headers=hdrs) - response = await self._client.send(request) - - # Save the diagnostics if requested - if diagnostics: - await self._async_update_diagnostics(timestamp, context, request, response) - - # Check response - if not response.is_success: - error = f"Unable to perform request, got response {response.status_code} {response.reason_phrase} while trying to reach {url}" - _LOGGER.debug(error) # logged as warning after last retry - raise DabPumpsApiError(error) - - if not response.headers.get('content-type','').startswith('application/json'): - return (response.text, request, response) - - result = response.json() - - # if the result structure contains a 'res' value, then check it - res = result.get('res', None) - if res and res != 'OK': - # BAD RESPONSE: { "res": "ERROR", "code": "FORBIDDEN", "msg": "Forbidden operation", "where": "ROUTE RULE" } - code = result.get('code', '') - msg = result.get('msg', '') - - if code in ['FORBIDDEN']: - error = f"Authorization failed: {res} {code} {msg}" - _LOGGER.debug(error) # logged as warning after last retry - raise DabPumpsApiRightsError(error) + async with self._client.request( + method = request["method"], + url = request["url"], + params = request.get("params", None), + data = request.get("data", None), + json = request.get("json", None), + headers = request.get("headers", None), + ) as rsp: + + # Remember actual requests and response params, used for diagnostics + request["headers"] = rsp.request_info.headers + response = { + "status": f"{rsp.status} {rsp.reason}", + "headers": rsp.headers, + "elapsed": (datetime.now() - timestamp).total_seconds(), + } + if rsp.ok and rsp.headers.get('content-type','').startswith('application/json'): + json = response["json"] = await rsp.json() + text = None else: - error = f"Unable to perform request, got response {res} {code} {msg} while trying to reach {url}" + text = response["text"] = await rsp.text() + json = None + + # Save the diagnostics if requested + if diagnostics: + await self._async_update_diagnostics(timestamp, context, request, response) + + # Check response + if not rsp.ok: + error = f"Unable to perform request, got response {response["status"]} while trying to reach {request["url"]}" _LOGGER.debug(error) # logged as warning after last retry raise DabPumpsApiError(error) - - return (result, request, response) + + if text is not None: + return (text, request, response) + + # if the result structure contains a 'res' value, then check it + res = json.get('res', None) + if res and res != 'OK': + # BAD RESPONSE: { "res": "ERROR", "code": "FORBIDDEN", "msg": "Forbidden operation", "where": "ROUTE RULE" } + code = json.get('code', '') + msg = json.get('msg', '') + + if code in ['FORBIDDEN']: + error = f"Authorization failed: {res} {code} {msg}" + _LOGGER.debug(error) # logged as warning after last retry + raise DabPumpsApiRightsError(error) + else: + error = f"Unable to perform request, got response {res} {code} {msg} while trying to reach {request["url"]}" + _LOGGER.debug(error) # logged as warning after last retry + raise DabPumpsApiError(error) + + return (json, request, response) async def async_get_diagnostics(self) -> dict[str, Any]: @@ -474,7 +515,7 @@ async def async_get_diagnostics(self) -> dict[str, Any]: } - async def _async_update_diagnostics(self, timestamp, context, request, response, token=None): + async def _async_update_diagnostics(self, timestamp, context: str, request: dict|None, response: dict|None, token: dict|None = None): # worker function async def _async_worker(self, timestamp, context, request, response, token): item = DabPumpsApiHistoryItem(timestamp, context, request, response, token) @@ -589,28 +630,22 @@ async def async_set_data(self, data_self): class DabPumpsApiHistoryItem(dict): - def __init__(self, timestamp, context, request, response, token): + def __init__(self, timestamp, context: str , request: dict|None, response: dict|None, token: dict|None): item = { "ts": timestamp, "op": context, } - # If possible, add a summary of the response status_code and json res and code + # If possible, add a summary of the response status and json res and code if response: rsp = [] - rsp.append(f"{response.status_code} {response.reason_phrase}") + rsp.append(response["status"]) - if response.is_success and response.headers.get('content-type','').startswith('application/json'): - json = response.json() - - if res := json.get('res', ''): - rsp.append(f"res={res}") - if code := json.get('code', ''): - rsp.append(f"code={code}") - if msg := json.get('msg', ''): - rsp.append(f"msg={msg}") - if details := json.get('details', ''): - rsp.append(f"details={details}") + if json := response.get("json", None): + if res := json.get('res', ''): rsp.append(f"res={res}") + if code := json.get('code', ''): rsp.append(f"code={code}") + if msg := json.get('msg', ''): rsp.append(f"msg={msg}") + if details := json.get('details', ''): rsp.append(f"details={details}") item["rsp"] = ', '.join(rsp) @@ -619,42 +654,15 @@ def __init__(self, timestamp, context, request, response, token): class DabPumpsApiHistoryDetail(dict): - def __init__(self, timestamp, context, request, response, token): + def __init__(self, timestamp, context: str, request: dict|None, response: dict|None, token: dict|None): item = { "ts": timestamp, } if request: - req = { - "method": request.method, - "url": str(request.url), - "headers": request.headers, - } - - if request.method == "POST": - content = str(request.content).lstrip('b').strip("'") - - if request.headers.get('content-type','').startswith('application/json'): - req["json"] = json.loads(content) - elif request.headers.get('content-type','').startswith('application/x-www-form-urlencoded'): - req["data"] = dict(urllib.parse.parse_qsl(content)) - else: - req["content"] = content - - item["req"] = req - + item["req"] = request if response: - res = { - "status": f"{response.status_code} {response.reason_phrase}", - "headers": response.headers, - "elapsed": response.elapsed.total_seconds(), - } - - if response.is_success and response.headers.get('content-type','').startswith('application/json'): - res['json'] = response.json() - - item["res"] = res - + item["rsp"] = response if token: item["token"] = token diff --git a/custom_components/dabpumps/const.py b/custom_components/dabpumps/const.py index 82ce9fa..4d76b64 100644 --- a/custom_components/dabpumps/const.py +++ b/custom_components/dabpumps/const.py @@ -97,6 +97,7 @@ COORDINATOR_RETRY_ATTEMPTS = 10 COORDINATOR_RETRY_DELAY = 5 # seconds +COORDINATOR_TIMEOUT = 120 # seconds API_LOGIN = types.SimpleNamespace() API_LOGIN.DABLIVE_APP_0 = 'DabLive_app_0' @@ -104,7 +105,6 @@ API_LOGIN.DCONNECT_APP = 'DConnect_app' API_LOGIN.DCONNECT_WEB = 'DConnect_web' -API_CLIENT_TIMEOUT = 120.0 # Debug: set this constant to True to simulate a configuration with multiple installations for one DAB account SIMULATE_MULTI_INSTALL = False