Skip to content

Commit

Permalink
Merge pull request #70 from pymeasure/fix-data-logger
Browse files Browse the repository at this point in the history
Fix data_logger timer
  • Loading branch information
BenediktBurger committed Apr 12, 2024
2 parents 0ce18cf + 5238d1c commit a4ee43b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 43 deletions.
84 changes: 50 additions & 34 deletions pyleco/management/data_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class StrEnum(str, Enum): # type: ignore
pass
import json
import logging
from threading import Lock
from typing import Any, Callable, Optional, Iterable

try:
Expand Down Expand Up @@ -98,10 +99,10 @@ class DataLogger(ExtendedMessageHandler):
"""

# TODO names
tmp: dict[str, list[Any]] = {} # contains all values since last datapoint
lists: dict[str, list[Any]] = {} # contains datapoints.
units: dict[str, Any] = {} # contains the units of the variables TODO TBD what the value is.
last_datapoint: dict[str, Any] = {}
tmp: dict[str, list[Any]] # contains all values since last datapoint
lists: dict[str, list[Any]] # contains datapoints.
units: dict[str, Any] # contains the units of the variables TODO TBD what the value is.
last_datapoint: dict[str, Any]
last_save_name: str = ""

# configuration variables
Expand All @@ -117,6 +118,10 @@ def __init__(self, name: str = "DataLoggerN", directory: str = ".", **kwargs) ->
self.directory = directory
self.publisher = DataPublisher(full_name=name)
self.valuing = average
# Initialize values
self.list_lock = Lock()
self.reset_data_storage()
self.units = {}
# TODO add auto_save functionality?

def register_rpc_methods(self) -> None:
Expand Down Expand Up @@ -161,11 +166,12 @@ def handle_subscription_message(self, data_message: DataMessage) -> None:

def handle_subscription_data(self, data: dict[str, Any]) -> None:
"""Store `data` dict in `tmp`"""
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.error(f"Got value for '{key}', but no list present.")
with self.list_lock:
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.debug("Got value for '%s', but no list present.", key)
if self.trigger_type == TriggerTypes.VARIABLE and self.trigger_variable in data.keys():
self.make_datapoint()

Expand All @@ -180,24 +186,29 @@ def make_datapoint(self) -> dict[str, Any]:
def calculate_data(self) -> dict[str, Any]:
"""Calculate data for a data point and return the data point."""
datapoint = {}
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(self.today, datetime.time(),
datetime.timezone.utc)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(variable, self.tmp[variable])
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint
with self.list_lock:
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(
self.today, datetime.time(), datetime.timezone.utc
)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(
variable, self.tmp[variable]
)
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint

def calculate_single_data(self, variable: str, tmp: list):
if tmp:
value = self.valuing(tmp)
elif self.value_repeating:
try:
# no lock, as this method is called in in a locked environment!
value = self.lists[variable][-1]
except (KeyError, IndexError): # No last value present.
value = nan
Expand Down Expand Up @@ -236,14 +247,14 @@ def start_collecting(self, *,
self.today = datetime.datetime.now(datetime.timezone.utc).date()
self.trigger_type = trigger_type or self._last_trigger_type
self._last_trigger_type = self.trigger_type
if self.trigger_type == TriggerTypes.TIMER:
self.start_timer_trigger()
if trigger_timeout is not None:
self.trigger_timeout = trigger_timeout
if trigger_variable is not None:
self.trigger_variable = trigger_variable
if value_repeating is not None:
self.value_repeating = value_repeating
if self.trigger_type == TriggerTypes.TIMER:
self.start_timer_trigger(timeout=self.trigger_timeout)
self.set_valuing_mode(valuing_mode=valuing_mode)
self.setup_variables(self.lists.keys() if variables is None else variables)
self.units = units if units else {}
Expand All @@ -267,18 +278,20 @@ def setup_variables(self, variables: Iterable[str]) -> None:
else:
# old style: topic is variable name
subscriptions.add(variable)
self.lists[variable] = []
self.tmp[variable] = []
with self.list_lock:
self.lists[variable] = []
self.tmp[variable] = []
self.subscribe(topics=subscriptions)

def reset_data_storage(self) -> None:
"""Reset the data storage."""
self.tmp = {}
self.lists = {}
with self.list_lock:
self.tmp = {}
self.lists = {}
self.last_datapoint = {}

def start_timer_trigger(self) -> None:
self.timer = RepeatingTimer(self.trigger_timeout, self.make_datapoint)
def start_timer_trigger(self, timeout: float) -> None:
self.timer = RepeatingTimer(timeout, self.make_datapoint)
self.timer.start()

def set_valuing_mode(self, valuing_mode: Optional[ValuingModes]) -> None:
Expand Down Expand Up @@ -313,8 +326,9 @@ def save_data(self, meta: Optional[dict] = None, suffix: str = "", header: str =
# 'user': self.user_data, # user stored meta data
})
try:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
with self.list_lock:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
except TypeError as exc:
log.exception("Some type error during saving occurred.", exc_info=exc)
raise
Expand Down Expand Up @@ -350,7 +364,8 @@ def get_configuration(self) -> dict[str, Any]:
config['valuing_mode'] = vm.value
config['value_repeating'] = self.value_repeating
# Header and Variables.
config['variables'] = list(self.lists.keys())
with self.list_lock:
config['variables'] = list(self.lists.keys())
config['units'] = self.units
# config['autoSave'] = self.actionAutoSave.isChecked()
return config
Expand All @@ -365,8 +380,9 @@ def get_last_save_name(self) -> Union[str, None]:

def get_list_length(self) -> int:
"""Return the length of the lists."""
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length
with self.list_lock:
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length


def main() -> None:
Expand Down
32 changes: 23 additions & 9 deletions tests/management/test_data_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ def test_start_collecting_starts_timer(data_logger: DataLogger):
# arrange
data_logger.trigger_timeout = 1000
# act
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER)
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=500)
# assert
assert data_logger.timer.interval == 1000
assert data_logger.timer.interval == 500
# cleanup
data_logger.timer.cancel()


def test_start_collecting_starts_timer_even_second_time(data_logger: DataLogger):
"""Even a second time, without explicit trigger type, the timer should be started."""
# arrange
data_logger.trigger_timeout = 1000
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER) # first time, to set type
data_logger.trigger_timeout = 500
# first time, to set type
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=1000)
data_logger.stop_collecting()
assert not hasattr(data_logger, "timer") # no timer left
# act
Expand Down Expand Up @@ -211,6 +212,7 @@ def test_handle_subscription_data_triggers(data_logger: DataLogger):

def test_handle_subscription_data_without_list(data_logger: DataLogger,
caplog: pytest.LogCaptureFixture):
caplog.set_level(0)
data_logger.handle_subscription_data({'not_present': 42})
assert caplog.messages == ["Got value for 'not_present', but no list present."]

Expand All @@ -224,8 +226,7 @@ def test_set_publisher_name(data_logger: DataLogger):
class Test_start_timer_trigger:
@pytest.fixture
def data_logger_stt(self, data_logger: DataLogger):
data_logger.trigger_timeout = 1000
data_logger.start_timer_trigger()
data_logger.start_timer_trigger(1000)
yield data_logger
data_logger.timer.cancel()

Expand Down Expand Up @@ -342,10 +343,23 @@ def saved_file(self, data_logger_sd: DataLogger):
path = Path(data_logger_sd.directory) / data_logger_sd.last_save_name
return path.with_suffix(".json").read_text()

@pytest.mark.xfail(True, reason="Not yet date recognition implemented.")
def test_output(self, saved_file: str):
# TODO make date comparison work.
assert saved_file == """["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, {"units": {}, "today": "2023-11-27", "file_name": "2023_11_27T15_07_06", "logger_name": "DataLoggerN", "configuration": {"trigger": "variable", "triggerVariable": "test", "trigger_variable": "test", "valuing_mode": "mean", "valueRepeat": false, "value_repeating": false, "variables": "time test 2 N1.sender.var"}}]""" # noqa
today_string = self.today.isoformat()
assert saved_file == "".join(
(
"""["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, """,
'''{"units": {}, "today": "''',
today_string,
'''", "file_name": "''',
self.file_name,
"""", "logger_name": "DataLoggerN", """,
""""configuration": {"trigger_type": "variable", "trigger_timeout": 10, """,
""""trigger_variable": "test", "valuing_mode": "average", """,
""""value_repeating": false, """,
""""variables": ["time", "test", "2", "N1.sender.var"], """,
""""units": {}}}]""",
)
)

def test_json_content(self, saved_file: str):
today_string = self.today.isoformat()
Expand Down

0 comments on commit a4ee43b

Please sign in to comment.