Skip to content

Commit

Permalink
Add lock for list access.
Browse files Browse the repository at this point in the history
  • Loading branch information
BenediktBurger committed Apr 10, 2024
1 parent b169d49 commit 5238d1c
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions pyleco/management/data_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class StrEnum(str, Enum): # type: ignore
pass
import json
import logging
from threading import Lock
from typing import Any, Callable, Optional, Iterable

try:
Expand Down Expand Up @@ -118,6 +119,7 @@ def __init__(self, name: str = "DataLoggerN", directory: str = ".", **kwargs) ->
self.publisher = DataPublisher(full_name=name)
self.valuing = average
# Initialize values
self.list_lock = Lock()
self.reset_data_storage()
self.units = {}
# TODO add auto_save functionality?
Expand Down Expand Up @@ -164,11 +166,12 @@ def handle_subscription_message(self, data_message: DataMessage) -> None:

def handle_subscription_data(self, data: dict[str, Any]) -> None:
"""Store `data` dict in `tmp`"""
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.debug("Got value for '%s', but no list present.", key)
with self.list_lock:
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.debug("Got value for '%s', but no list present.", key)
if self.trigger_type == TriggerTypes.VARIABLE and self.trigger_variable in data.keys():
self.make_datapoint()

Expand All @@ -183,24 +186,29 @@ def make_datapoint(self) -> dict[str, Any]:
def calculate_data(self) -> dict[str, Any]:
"""Calculate data for a data point and return the data point."""
datapoint = {}
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(self.today, datetime.time(),
datetime.timezone.utc)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(variable, self.tmp[variable])
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint
with self.list_lock:
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(
self.today, datetime.time(), datetime.timezone.utc
)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(
variable, self.tmp[variable]
)
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint

def calculate_single_data(self, variable: str, tmp: list):
if tmp:
value = self.valuing(tmp)
elif self.value_repeating:
try:
# no lock, as this method is called in in a locked environment!
value = self.lists[variable][-1]
except (KeyError, IndexError): # No last value present.
value = nan
Expand Down Expand Up @@ -270,14 +278,16 @@ def setup_variables(self, variables: Iterable[str]) -> None:
else:
# old style: topic is variable name
subscriptions.add(variable)
self.lists[variable] = []
self.tmp[variable] = []
with self.list_lock:
self.lists[variable] = []
self.tmp[variable] = []
self.subscribe(topics=subscriptions)

def reset_data_storage(self) -> None:
"""Reset the data storage."""
self.tmp = {}
self.lists = {}
with self.list_lock:
self.tmp = {}
self.lists = {}
self.last_datapoint = {}

def start_timer_trigger(self, timeout: float) -> None:
Expand Down Expand Up @@ -316,8 +326,9 @@ def save_data(self, meta: Optional[dict] = None, suffix: str = "", header: str =
# 'user': self.user_data, # user stored meta data
})
try:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
with self.list_lock:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
except TypeError as exc:
log.exception("Some type error during saving occurred.", exc_info=exc)
raise
Expand Down Expand Up @@ -353,7 +364,8 @@ def get_configuration(self) -> dict[str, Any]:
config['valuing_mode'] = vm.value
config['value_repeating'] = self.value_repeating
# Header and Variables.
config['variables'] = list(self.lists.keys())
with self.list_lock:
config['variables'] = list(self.lists.keys())
config['units'] = self.units
# config['autoSave'] = self.actionAutoSave.isChecked()
return config
Expand All @@ -368,8 +380,9 @@ def get_last_save_name(self) -> Union[str, None]:

def get_list_length(self) -> int:
"""Return the length of the lists."""
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length
with self.list_lock:
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length


def main() -> None:
Expand Down

0 comments on commit 5238d1c

Please sign in to comment.