Skip to content

Commit

Permalink
Also periodically cache status values
Browse files Browse the repository at this point in the history
  • Loading branch information
ankohanse committed Apr 14, 2024
1 parent c2e0ac1 commit 4b535a1
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 20 deletions.
7 changes: 4 additions & 3 deletions custom_components/dabpumps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def async_login(self):
# still valid for another 10 seconds
await self._async_update_diagnostics(datetime.now(), "token reuse", None, None, token_payload)
return

# Make sure to have been logged out of previous sessions.
# DAB Pumps service does not handle multiple logins from same account very well
await self.async_logout()
Expand Down Expand Up @@ -199,7 +199,7 @@ async def async_login_dconnect_app(self):
# Use a fresh client to keep track of cookies during login and subsequent calls
client = httpx.AsyncClient(follow_redirects=True, timeout=120.0)

context = "login DConnect_app"
context = f"login DConnect_app"
verb = "POST"
url = DABPUMPS_SSO_URL + f"/auth/realms/dwt-group/protocol/openid-connect/token"
data = {
Expand All @@ -224,7 +224,7 @@ async def async_login_dconnect_app(self):
raise DabPumpsApiAuthError(error)

# Step 2: Validate the auth token against the DABPumps Api
context = "login DConnect_app validatetoken"
context = f"login DConnect_app validatetoken"
verb = "GET"
url = DABPUMPS_API_URL + f"/api/v1/token/validatetoken"
params = {
Expand Down Expand Up @@ -397,6 +397,7 @@ async def async_change_device_status(self, status, value):

async def async_fetch_strings(self, lang):
"""Get string translations"""

context = f"localization_{lang}"
verb = "GET"
url = DABPUMPS_API_URL + f"/resources/js/localization_{lang}.properties?format=JSON"
Expand Down
73 changes: 63 additions & 10 deletions custom_components/dabpumps/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re

from collections import namedtuple
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any

from homeassistant.components.diagnostics import REDACTED
Expand Down Expand Up @@ -298,7 +298,15 @@ async def _async_detect_data(self):
error = None
for retry in range(0, COORDINATOR_RETRY_ATTEMPTS):
try:
await self._api.async_login()
try:
await self._api.async_login()
except:
if len(self._device_map) > 0:
# Force retry in loop by raising original exception
raise
else:
# Ignore and use persisted cached data if this is the initial retrieve
pass

# Attempt to refresh installation details and devices when the cached one expires (once a day)
await self._async_detect_install_details()
Expand Down Expand Up @@ -447,15 +455,26 @@ async def _async_detect_device_statusses(self):
return

for device in self._device_map.values():

# First try to retrieve from API
context = f"statusses {device.serial}"
try:
data = await self._api.async_fetch_device_statusses(device)
await self._async_process_device_status_data(device, data)

# do not persits volatile data in the cache file
await self._async_update_cache(context, data)

except Exception as e:
# Force retry in calling function by raising original exception
raise e
if any(status.serial==device.serial for status in self._status_map.values()):
# Ignore problems if this is just a refresh
pass
else:
# Retry from persisted cache if this is the initial retrieve.
try:
data = await self._async_fetch_from_cache(context)
await self._async_process_device_status_data(device, data)
except Exception:
# Force retry in calling function by raising original exception
raise e

# If we reach this point, then all device statusses have been fetched/refreshed
self._status_map_ts = datetime.now()
Expand All @@ -469,7 +488,7 @@ async def _async_detect_strings(self):
# Not yet expired
return

context = "localization_{self.language}"
context = f"localization_{self.language}"
try:
data = await self._api.async_fetch_strings(self.language)
await self._async_process_strings_data(data)
Expand Down Expand Up @@ -502,7 +521,7 @@ async def _async_detect_installations(self, ignore_exception=False):

# First try to retrieve from API.
# Make sure not to overwrite data in dabpumps.api_history file when an empty list is returned.
context = "installation list"
context = f"installation list"
try:
data = await self._api.async_fetch_install_list()
await self._async_process_install_list(data, ignore_empty=True)
Expand Down Expand Up @@ -716,11 +735,27 @@ async def _async_process_device_status_data(self, device, data):
status_map = {}
status = data.get('status') or "{}"
values = json.loads(status)

# determine whether the values are still valid.
# Expired values can happen when using fallback read from (outdated) cache
status_validity = int(data.get('status_validity', 0)) * 60
status_ts = data.get('statusts', '')
status_ts = datetime.fromisoformat(status_ts) if status_ts else datetime.min
ts_now = datetime.now(timezone.utc)
expired = (ts_now - status_ts).total_seconds() > status_validity
if expired:
_LOGGER.warning(f"Detected expired data; set all status values to unknown")

for item_key, item_val in values.items():
# the value 'h' is used when a property is not available/supported
if item_val=='h':
continue

# If the data is expired then set all values to unknown.
# This is used to be able to initialize the integration during startup, even if
# communication to DAB Pumps fails.
if expired:
item_val = None

# Item Entity ID is combination of device serial and each field unique name as internal sensor hash
# Item Unique ID is a more readable version
Expand Down Expand Up @@ -762,11 +797,27 @@ async def _async_update_cache(self, context, data):
# worker function
async def _async_worker(self, context, data):
if not self._store:
return None
return

# Retrieve cache file contents
store = await self._store.async_get_data() or {}
cache = store.get("cache", {})
cache[context] = { "ts": datetime.now() } | async_redact_data(data, DIAGNOSTICS_REDACT)

data_old = cache.get(context, {})

# We only update the cached contents once a day to prevent too many writes of unchanged data
ts_str = data_old.get("ts", "")
ts_old = datetime.fromisoformat(ts_str) if ts_str else datetime.min
ts_new = datetime.now()

if (ts_new - ts_old).total_seconds() < 86400-300: # 1 day minus 5 minutes
# Not expired yet
return

_LOGGER.debug(f"Update cache: {context}")

# Update and write new cache file contents
cache[context] = { "ts": ts_new } | data

store["cache"] = cache
await self._store.async_set_data(store)
Expand All @@ -782,6 +833,8 @@ async def _async_fetch_from_cache(self, context):
if not self._store:
return {}

_LOGGER.debug(f"Fetch from cache: {context}")

store = await self._store.async_get_data() or {}
cache = store.get("cache", {})
data = cache.get(context, {})
Expand Down
4 changes: 2 additions & 2 deletions custom_components/dabpumps/number.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ def _update_attributes(self, status, is_create):
attr_precision = int(math.floor(math.log10(1.0 / self._params.weight)))
attr_min = round(float(self._params.min) * self._params.weight, attr_precision)
attr_max = round(float(self._params.max) * self._params.weight, attr_precision)
attr_val = round(float(status.val) * self._params.weight, attr_precision)
attr_val = round(float(status.val) * self._params.weight, attr_precision) if status.val!=None else None
attr_step = self._params.weight
else:
# Convert to int
attr_precision = 0
attr_min = int(self._params.min)
attr_max = int(self._params.max)
attr_val = int(status.val)
attr_val = int(status.val) if status.val!=None else None
attr_step = self.get_number_step()

# update creation-time only attributes
Expand Down
2 changes: 1 addition & 1 deletion custom_components/dabpumps/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _update_attributes(self, status, is_create):

# Process any changes
changed = False
attr_val = self._dict.get(status.val, status.val)
attr_val = self._dict.get(status.val, status.val) if status.val!=None else None

# update creation-time only attributes
if is_create:
Expand Down
8 changes: 4 additions & 4 deletions custom_components/dabpumps/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def _update_attributes(self, status, is_create):
if self._params.weight and self._params.weight != 1 and self._params.weight != 0:
# Convert to float
attr_precision = int(math.floor(math.log10(1.0 / self._params.weight)))
attr_val = round(float(status.val) * self._params.weight, attr_precision)
attr_val = round(float(status.val) * self._params.weight, attr_precision) if status.val!=None else None
attr_unit = self.get_unit()
else:
# Convert to int
attr_precision = 0
attr_val = int(status.val)
attr_val = int(status.val) if status.val!=None else None
attr_unit = self.get_unit()

case 'enum':
# Lookup the dict string for the value and otherwise return the value itself
attr_precision = None
attr_val = self._get_string(self._params.values.get(status.val, status.val))
attr_val = self._get_string(self._params.values.get(status.val, status.val)) if status.val!=None else None
attr_unit = None

case 'label' | _:
Expand All @@ -144,7 +144,7 @@ def _update_attributes(self, status, is_create):

# Convert to string
attr_precision = None
attr_val = self._get_string(str(status.val))
attr_val = self._get_string(str(status.val)) if status.val!=None else None
attr_unit = None

# Process any changes
Expand Down

0 comments on commit 4b535a1

Please sign in to comment.