Skip to content

Commit

Permalink
Add support for Number, Select and Switch entities
Browse files Browse the repository at this point in the history
  • Loading branch information
ankohanse committed Mar 1, 2024
1 parent 8d22671 commit 0fd6e2e
Show file tree
Hide file tree
Showing 13 changed files with 1,476 additions and 380 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"python.analysis.extraPaths": [
"D:\\projects\\hass-core",
"D:\\projects\\hass-dependencies\\async-timeout"
]
}
6 changes: 5 additions & 1 deletion custom_components/dabpumps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@

PLATFORMS: list[Platform] = [
Platform.SENSOR,
Platform.BINARY_SENSOR
Platform.BINARY_SENSOR,
Platform.NUMBER,
Platform.SELECT,
Platform.SWITCH,

]


Expand Down
219 changes: 155 additions & 64 deletions custom_components/dabpumps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import asyncio
import hashlib
import httpx
import json
import jwt
import logging
import math
import re
import httpx
import time
import urllib.parse

from collections import namedtuple
from collections import defaultdict, namedtuple
from datetime import datetime, timedelta
from typing import Any

Expand All @@ -25,6 +28,8 @@
DOMAIN,
API,
DABPUMPS_API_URL,
DABPUMPS_API_HOST,
API_TOKEN_TIME_MIN,
SIMULATE_SUFFIX_ID,
DIAGNOSTICS_REDACT
)
Expand Down Expand Up @@ -75,22 +80,39 @@ def __init__(self, hass, username, password):
self._client = None

# calls history for diagnostics
self._hass = hass
self._history_store = Store[dict](hass, self._STORAGE_VERSION, self._STORAGE_KEY_HISTORY) if hass else None

# cleanup history store after each restart
self._hass.async_create_task(self._history_store.async_remove())


async def async_login(self):
if (self._client):
return True
# Step 0: do we still have a client with a non-expired auth token?
if self._client:
token = self._client.cookies.get("dabcsauthtoken", domain=DABPUMPS_API_HOST)
if token:
token_payload = jwt.decode(jwt=token, options={"verify_signature": False})

if token_payload.get("exp", 0) - time.time() > API_TOKEN_TIME_MIN:
# still valid for another 10 seconds
await self._async_update_diagnostics("token reuse", None, None)
return True

# Duplicate check to have been logged out of previous sessions.
# DAB Pumps service does not handle multiple logins from same account very well
await self.async_logout()

# Use a fresh client to keep track of cookies during login and subsequent calls
client = httpx.AsyncClient(follow_redirects=True, timeout=120.0)

# Step 1: get login url
url = DABPUMPS_API_URL
_LOGGER.debug(f"DAB Pumps retrieve login page via GET {url}")
response = await client.get(url)
request = client.build_request("GET", url)
response = await client.send(request)

await self._async_update_diagnostics("home", "GET", url, None, None, response)
await self._async_update_diagnostics("home", request, response)

if (not response.is_success):
error = f"Unable to connect, got response {response.status_code} while trying to reach {url}"
Expand All @@ -109,9 +131,10 @@ async def async_login(self):

# Step 2: Login
_LOGGER.debug(f"DAB Pumps login via POST {login_url}")
response = await client.post(login_url, data=login_data, headers=login_hdrs)

await self._async_update_diagnostics("login", "POST", login_url, login_hdrs, login_data, response)
request = client.build_request("POST", login_url, data=login_data, headers=login_hdrs)
response = await client.send(request)

await self._async_update_diagnostics("login", request, response)

if (not response.is_success):
error = f"Unable to login, got response {response.status_code}"
Expand All @@ -128,16 +151,19 @@ async def async_logout(self):
url = DABPUMPS_API_URL + '/logout'

_LOGGER.debug(f"DAB Pumps logout via GET {url}")
response = await self._client.get(url)

await self._async_update_diagnostics("logout", "GET", url, None, None, response)
request = self._client.build_request("GET", url)
response = await self._client.send(request)

await self._async_update_diagnostics("logout", request, response)

if (not response.is_success):
error = f"Unable to logout, got response {response.status_code} while trying to reach {url}"
# ignore and continue

# Forget our current session so we are forced to do a fresh login in a next retry
await self._client.aclose()
except:
pass # ignore any exceptions
finally:
self._client = None

Expand All @@ -147,12 +173,13 @@ async def async_logout(self):
async def async_fetch_installs(self):
# Get installation data
url = DABPUMPS_API_URL + '/api/v1/gui/installation/list?lang=en'

_LOGGER.debug(f"DAB Pumps retrieve installation info via GET {url}")
response = await self._client.get(url)
request = self._client.build_request("GET", url)
response = await self._client.send(request)

await self._async_update_diagnostics("installation list", request, response)

await self._async_update_diagnostics("installation list", "GET", url, None, None, response)

if (not response.is_success):
error = f"Unable retrieve installations, got response {response.status_code} while trying to reach {url}"
_LOGGER.debug(error) # logged as warning after last retry
Expand Down Expand Up @@ -180,9 +207,10 @@ async def async_fetch_strings(self, lang):
url = DABPUMPS_API_URL + f"/resources/js/localization_{lang}.properties?format=JSON"

_LOGGER.debug(f"DAB Pumps retrieve language info via GET {url}")
response = await self._client.get(url)
request = self._client.build_request("GET", url)
response = await self._client.send(request)

await self._async_update_diagnostics(f"localization_{lang}", "GET", url, None, None, response)
await self._async_update_diagnostics(f"localization_{lang}", request, response)

if (not response.is_success):
error = f"Unable retrieve language info, got response {response.status_code} while trying to reach {url}"
Expand All @@ -207,13 +235,15 @@ async def async_fetch_strings(self, lang):

# Fetch the statusses for a DAB Pumps device, which then constitues the Sensors
async def async_fetch_device_statusses(self, device):

url = DABPUMPS_API_URL + f"/dumstate/{device.serial.removesuffix(SIMULATE_SUFFIX_ID)}"

serial = device.serial.removesuffix(SIMULATE_SUFFIX_ID)
url = DABPUMPS_API_URL + f"/dumstate/{serial}"

_LOGGER.debug(f"DAB Pumps retrieve device statusses for '{device.name}' via GET {url}")
response = await self._client.get(url)
request = self._client.build_request("GET", url)
response = await self._client.send(request)

await self._async_update_diagnostics(f"statusses {device.serial.removesuffix(SIMULATE_SUFFIX_ID)}", "GET", url, None, None, response)
await self._async_update_diagnostics(f"statusses {serial}", request, response)

if (not response.is_success):
error = f"Unable retrieve device data for '{device.name}', got response {response.status_code} while trying to reach {url}"
Expand All @@ -235,43 +265,106 @@ async def async_fetch_device_statusses(self, device):
return json.loads(result.get('status', '{}'))


async def async_change_device_status(self, status, value):

serial = status.serial.removesuffix(SIMULATE_SUFFIX_ID)

url = DABPUMPS_API_URL + f"/dum/{serial}"
data = {'key': status.key, 'value': str(value) }
hdrs = {'Content-Type': 'application/json'}

_LOGGER.debug(f"DAB Pumps set device param for '{status.unique_id}' to '{value}' via POST {url}")
request = self._client.build_request("POST", url, json=data, headers=hdrs)
response = await self._client.send(request)

await self._async_update_diagnostics(f"set {serial}:{status.key}", request, response)

if (not response.is_success):
error = f"Unable to set status {status.unique_id} to '{value}', got response {response.status_code} while trying to reach {url}"
_LOGGER.debug(error) # logged as warning after last retry
raise DabPumpsApiError(error)

result = response.json()
if (result['res'] != 'OK'):
if result['code'] in ['FORBIDDEN']:
error = f"Authentication failed: {result['res']} {result['code']} {result.get('msg','')}"
_LOGGER.debug(error) # logged as warning after last retry
raise DabPumpsApiAuthError(error)
else:
error = f"Unable retrieve device data, got response {result['res']} {result['code']} {result.get('message','')} while trying to reach {url}"
_LOGGER.debug(error) # logged as warning after last retry
raise DabPumpsApiError(error)

return True



async def async_get_diagnostics(self) -> dict[str, Any]:
data = await self._history_store.async_load() or {}
counter = data.get("counter", {})
history = data.get("history", [])
details = data.get("details", {})

return {
"username": self._username,
"password": self._password,
calls_total = sum([ n for key, n in counter.items() ]) or 1
calls_counter = { key: n for key, n in counter.items() }
calls_percent = { key: round(100.0 * n / calls_total, 2) for key, n in counter.items() }

"history": history,
"details": details,
return {
"config": {
"username": self._username,
"password": self._password,
},
"diagnostics": {
"counter": calls_counter,
"percent": calls_percent,
"history": history,
"details": details,
}
}


async def _async_update_diagnostics(self, context, method, url, headers, content, response):
if not self._history_store:
return

item = DabPumpsApiHistoryItem(context)
detail = DabPumpsApiHistoryDetail(context, method, url, headers, content, response)

# Persist this history in file instead of keeping in memory
data = await self._history_store.async_load() or {}
history = data.get("history", [])
details = data.get("details", {})

history.append( async_redact_data(item, DIAGNOSTICS_REDACT) )
if len(history) > 32:
history.pop(0)

details[context] = async_redact_data(detail, DIAGNOSTICS_REDACT)

data["history"] = history
data["details"] = details
await self._history_store.async_save(data)
async def _async_update_diagnostics(self, context, request, response):
# worker function
async def _async_worker(self, context, request, response):
item = DabPumpsApiHistoryItem(context)
detail = DabPumpsApiHistoryDetail(context, request, response) if request and response else None

# Persist this history in file instead of keeping in memory
data = await self._history_store.async_load() or {}
counter = data.get("counter", {})
history = data.get("history", [])
details = data.get("details", {})

if context in counter:
counter[context] += 1
else:
counter[context] = 1

history.append( async_redact_data(item, DIAGNOSTICS_REDACT) )
if len(history) > 32:
history.pop(0)

details[context] = async_redact_data(detail, DIAGNOSTICS_REDACT)

data["history"] = history
data["counter"] = counter
data["details"] = details
await self._history_store.async_save(data)

# Create the worker task to update diagnostics in the background,
# but do not let main loop wait for it to finish
self._hass.async_create_task(_async_worker(self, context, request, response))



class DabPumpsApiAuthError(Exception):
"""Exception to indicate authentication failure."""


class DabPumpsApiError(Exception):
"""Exception to indicate generic error failure."""


class DabPumpsApiHistoryItem(dict):
def __init__(self, context):
super().__init__({
Expand All @@ -281,13 +374,22 @@ def __init__(self, context):


class DabPumpsApiHistoryDetail(dict):
def __init__(self, context, method, url, headers, content, response):
def __init__(self, context, request, response):
req = {
"method": method,
"url": url,
"headers": headers,
"content": content,
"method": request.method,
"url": str(request.url),
"headers": request.headers,
}
if request.method == "POST":
content = str(request.content).lstrip('b').strip("'")

if request.headers.get('content-type','').startswith('application/json'):
req["json"] = json.loads(content)
elif request.headers.get('content-type','').startswith('application/x-www-form-urlencoded'):
req["data"] = dict(urllib.parse.parse_qsl(content))
else:
req["content"] = content

res = {
"status": response.status_code,
"reason": response.reason_phrase,
Expand All @@ -301,14 +403,3 @@ def __init__(self, context, method, url, headers, content, response):
"request": req,
"response": res,
})


class DabPumpsApiAuthError(Exception):
"""Exception to indicate authentication failure."""


class DabPumpsApiError(Exception):
"""Exception to indicate generic error failure."""



Loading

0 comments on commit 0fd6e2e

Please sign in to comment.