diff --git a/pyleco/management/data_logger.py b/pyleco/management/data_logger.py index 5eaf7cb8..794f228e 100644 --- a/pyleco/management/data_logger.py +++ b/pyleco/management/data_logger.py @@ -36,6 +36,7 @@ class StrEnum(str, Enum): # type: ignore pass import json import logging +from threading import Lock from typing import Any, Callable, Optional, Iterable try: @@ -98,10 +99,10 @@ class DataLogger(ExtendedMessageHandler): """ # TODO names - tmp: dict[str, list[Any]] = {} # contains all values since last datapoint - lists: dict[str, list[Any]] = {} # contains datapoints. - units: dict[str, Any] = {} # contains the units of the variables TODO TBD what the value is. - last_datapoint: dict[str, Any] = {} + tmp: dict[str, list[Any]] # contains all values since last datapoint + lists: dict[str, list[Any]] # contains datapoints. + units: dict[str, Any] # contains the units of the variables TODO TBD what the value is. + last_datapoint: dict[str, Any] last_save_name: str = "" # configuration variables @@ -117,6 +118,10 @@ def __init__(self, name: str = "DataLoggerN", directory: str = ".", **kwargs) -> self.directory = directory self.publisher = DataPublisher(full_name=name) self.valuing = average + # Initialize values + self.list_lock = Lock() + self.reset_data_storage() + self.units = {} # TODO add auto_save functionality? def register_rpc_methods(self) -> None: @@ -161,11 +166,12 @@ def handle_subscription_message(self, data_message: DataMessage) -> None: def handle_subscription_data(self, data: dict[str, Any]) -> None: """Store `data` dict in `tmp`""" - for key, value in data.items(): - try: - self.tmp[key].append(value) - except KeyError: - log.error(f"Got value for '{key}', but no list present.") + with self.list_lock: + for key, value in data.items(): + try: + self.tmp[key].append(value) + except KeyError: + log.debug("Got value for '%s', but no list present.", key) if self.trigger_type == TriggerTypes.VARIABLE and self.trigger_variable in data.keys(): self.make_datapoint() @@ -180,24 +186,29 @@ def make_datapoint(self) -> dict[str, Any]: def calculate_data(self) -> dict[str, Any]: """Calculate data for a data point and return the data point.""" datapoint = {} - if 'time' in self.lists.keys(): - now = datetime.datetime.now(datetime.timezone.utc) - today = datetime.datetime.combine(self.today, datetime.time(), - datetime.timezone.utc) - time = (now - today).total_seconds() - self.tmp['time'].append(time) - for variable, datalist in self.lists.items(): - value = datapoint[variable] = self.calculate_single_data(variable, self.tmp[variable]) - datalist.append(value) - for key in self.tmp.keys(): - self.tmp[key].clear() - return datapoint + with self.list_lock: + if 'time' in self.lists.keys(): + now = datetime.datetime.now(datetime.timezone.utc) + today = datetime.datetime.combine( + self.today, datetime.time(), datetime.timezone.utc + ) + time = (now - today).total_seconds() + self.tmp['time'].append(time) + for variable, datalist in self.lists.items(): + value = datapoint[variable] = self.calculate_single_data( + variable, self.tmp[variable] + ) + datalist.append(value) + for key in self.tmp.keys(): + self.tmp[key].clear() + return datapoint def calculate_single_data(self, variable: str, tmp: list): if tmp: value = self.valuing(tmp) elif self.value_repeating: try: + # no lock, as this method is called in in a locked environment! value = self.lists[variable][-1] except (KeyError, IndexError): # No last value present. value = nan @@ -236,14 +247,14 @@ def start_collecting(self, *, self.today = datetime.datetime.now(datetime.timezone.utc).date() self.trigger_type = trigger_type or self._last_trigger_type self._last_trigger_type = self.trigger_type - if self.trigger_type == TriggerTypes.TIMER: - self.start_timer_trigger() if trigger_timeout is not None: self.trigger_timeout = trigger_timeout if trigger_variable is not None: self.trigger_variable = trigger_variable if value_repeating is not None: self.value_repeating = value_repeating + if self.trigger_type == TriggerTypes.TIMER: + self.start_timer_trigger(timeout=self.trigger_timeout) self.set_valuing_mode(valuing_mode=valuing_mode) self.setup_variables(self.lists.keys() if variables is None else variables) self.units = units if units else {} @@ -267,18 +278,20 @@ def setup_variables(self, variables: Iterable[str]) -> None: else: # old style: topic is variable name subscriptions.add(variable) - self.lists[variable] = [] - self.tmp[variable] = [] + with self.list_lock: + self.lists[variable] = [] + self.tmp[variable] = [] self.subscribe(topics=subscriptions) def reset_data_storage(self) -> None: """Reset the data storage.""" - self.tmp = {} - self.lists = {} + with self.list_lock: + self.tmp = {} + self.lists = {} self.last_datapoint = {} - def start_timer_trigger(self) -> None: - self.timer = RepeatingTimer(self.trigger_timeout, self.make_datapoint) + def start_timer_trigger(self, timeout: float) -> None: + self.timer = RepeatingTimer(timeout, self.make_datapoint) self.timer.start() def set_valuing_mode(self, valuing_mode: Optional[ValuingModes]) -> None: @@ -313,8 +326,9 @@ def save_data(self, meta: Optional[dict] = None, suffix: str = "", header: str = # 'user': self.user_data, # user stored meta data }) try: - with open(f"{folder}/{file_name}.json", 'w') as file: - json.dump(obj=(header, self.lists, meta), fp=file) + with self.list_lock: + with open(f"{folder}/{file_name}.json", 'w') as file: + json.dump(obj=(header, self.lists, meta), fp=file) except TypeError as exc: log.exception("Some type error during saving occurred.", exc_info=exc) raise @@ -350,7 +364,8 @@ def get_configuration(self) -> dict[str, Any]: config['valuing_mode'] = vm.value config['value_repeating'] = self.value_repeating # Header and Variables. - config['variables'] = list(self.lists.keys()) + with self.list_lock: + config['variables'] = list(self.lists.keys()) config['units'] = self.units # config['autoSave'] = self.actionAutoSave.isChecked() return config @@ -365,8 +380,9 @@ def get_last_save_name(self) -> Union[str, None]: def get_list_length(self) -> int: """Return the length of the lists.""" - length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0 - return length + with self.list_lock: + length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0 + return length def main() -> None: diff --git a/tests/management/test_data_logger.py b/tests/management/test_data_logger.py index df3b2185..1f6af02f 100644 --- a/tests/management/test_data_logger.py +++ b/tests/management/test_data_logger.py @@ -81,9 +81,9 @@ def test_start_collecting_starts_timer(data_logger: DataLogger): # arrange data_logger.trigger_timeout = 1000 # act - data_logger.start_collecting(trigger_type=TriggerTypes.TIMER) + data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=500) # assert - assert data_logger.timer.interval == 1000 + assert data_logger.timer.interval == 500 # cleanup data_logger.timer.cancel() @@ -91,8 +91,9 @@ def test_start_collecting_starts_timer(data_logger: DataLogger): def test_start_collecting_starts_timer_even_second_time(data_logger: DataLogger): """Even a second time, without explicit trigger type, the timer should be started.""" # arrange - data_logger.trigger_timeout = 1000 - data_logger.start_collecting(trigger_type=TriggerTypes.TIMER) # first time, to set type + data_logger.trigger_timeout = 500 + # first time, to set type + data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=1000) data_logger.stop_collecting() assert not hasattr(data_logger, "timer") # no timer left # act @@ -211,6 +212,7 @@ def test_handle_subscription_data_triggers(data_logger: DataLogger): def test_handle_subscription_data_without_list(data_logger: DataLogger, caplog: pytest.LogCaptureFixture): + caplog.set_level(0) data_logger.handle_subscription_data({'not_present': 42}) assert caplog.messages == ["Got value for 'not_present', but no list present."] @@ -224,8 +226,7 @@ def test_set_publisher_name(data_logger: DataLogger): class Test_start_timer_trigger: @pytest.fixture def data_logger_stt(self, data_logger: DataLogger): - data_logger.trigger_timeout = 1000 - data_logger.start_timer_trigger() + data_logger.start_timer_trigger(1000) yield data_logger data_logger.timer.cancel() @@ -342,10 +343,23 @@ def saved_file(self, data_logger_sd: DataLogger): path = Path(data_logger_sd.directory) / data_logger_sd.last_save_name return path.with_suffix(".json").read_text() - @pytest.mark.xfail(True, reason="Not yet date recognition implemented.") def test_output(self, saved_file: str): - # TODO make date comparison work. - assert saved_file == """["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, {"units": {}, "today": "2023-11-27", "file_name": "2023_11_27T15_07_06", "logger_name": "DataLoggerN", "configuration": {"trigger": "variable", "triggerVariable": "test", "trigger_variable": "test", "valuing_mode": "mean", "valueRepeat": false, "value_repeating": false, "variables": "time test 2 N1.sender.var"}}]""" # noqa + today_string = self.today.isoformat() + assert saved_file == "".join( + ( + """["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, """, + '''{"units": {}, "today": "''', + today_string, + '''", "file_name": "''', + self.file_name, + """", "logger_name": "DataLoggerN", """, + """"configuration": {"trigger_type": "variable", "trigger_timeout": 10, """, + """"trigger_variable": "test", "valuing_mode": "average", """, + """"value_repeating": false, """, + """"variables": ["time", "test", "2", "N1.sender.var"], """, + """"units": {}}}]""", + ) + ) def test_json_content(self, saved_file: str): today_string = self.today.isoformat()