Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data_logger timer #70

Merged
merged 4 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions pyleco/management/data_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class StrEnum(str, Enum): # type: ignore
pass
import json
import logging
from threading import Lock
from typing import Any, Callable, Optional, Iterable

try:
Expand Down Expand Up @@ -98,10 +99,10 @@ class DataLogger(ExtendedMessageHandler):
"""

# TODO names
tmp: dict[str, list[Any]] = {} # contains all values since last datapoint
lists: dict[str, list[Any]] = {} # contains datapoints.
units: dict[str, Any] = {} # contains the units of the variables TODO TBD what the value is.
last_datapoint: dict[str, Any] = {}
tmp: dict[str, list[Any]] # contains all values since last datapoint
lists: dict[str, list[Any]] # contains datapoints.
units: dict[str, Any] # contains the units of the variables TODO TBD what the value is.
last_datapoint: dict[str, Any]
last_save_name: str = ""

# configuration variables
Expand All @@ -117,6 +118,10 @@ def __init__(self, name: str = "DataLoggerN", directory: str = ".", **kwargs) ->
self.directory = directory
self.publisher = DataPublisher(full_name=name)
self.valuing = average
# Initialize values
self.list_lock = Lock()
self.reset_data_storage()
self.units = {}
# TODO add auto_save functionality?

def register_rpc_methods(self) -> None:
Expand Down Expand Up @@ -161,11 +166,12 @@ def handle_subscription_message(self, data_message: DataMessage) -> None:

def handle_subscription_data(self, data: dict[str, Any]) -> None:
"""Store `data` dict in `tmp`"""
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.error(f"Got value for '{key}', but no list present.")
with self.list_lock:
for key, value in data.items():
try:
self.tmp[key].append(value)
except KeyError:
log.debug("Got value for '%s', but no list present.", key)
if self.trigger_type == TriggerTypes.VARIABLE and self.trigger_variable in data.keys():
self.make_datapoint()

Expand All @@ -180,24 +186,29 @@ def make_datapoint(self) -> dict[str, Any]:
def calculate_data(self) -> dict[str, Any]:
"""Calculate data for a data point and return the data point."""
datapoint = {}
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(self.today, datetime.time(),
datetime.timezone.utc)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(variable, self.tmp[variable])
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint
with self.list_lock:
if 'time' in self.lists.keys():
now = datetime.datetime.now(datetime.timezone.utc)
today = datetime.datetime.combine(
self.today, datetime.time(), datetime.timezone.utc
)
time = (now - today).total_seconds()
self.tmp['time'].append(time)
for variable, datalist in self.lists.items():
value = datapoint[variable] = self.calculate_single_data(
variable, self.tmp[variable]
)
datalist.append(value)
for key in self.tmp.keys():
self.tmp[key].clear()
return datapoint

def calculate_single_data(self, variable: str, tmp: list):
if tmp:
value = self.valuing(tmp)
elif self.value_repeating:
try:
# no lock, as this method is called in in a locked environment!
value = self.lists[variable][-1]
except (KeyError, IndexError): # No last value present.
value = nan
Expand Down Expand Up @@ -236,14 +247,14 @@ def start_collecting(self, *,
self.today = datetime.datetime.now(datetime.timezone.utc).date()
self.trigger_type = trigger_type or self._last_trigger_type
self._last_trigger_type = self.trigger_type
if self.trigger_type == TriggerTypes.TIMER:
self.start_timer_trigger()
if trigger_timeout is not None:
self.trigger_timeout = trigger_timeout
if trigger_variable is not None:
self.trigger_variable = trigger_variable
if value_repeating is not None:
self.value_repeating = value_repeating
if self.trigger_type == TriggerTypes.TIMER:
self.start_timer_trigger(timeout=self.trigger_timeout)
self.set_valuing_mode(valuing_mode=valuing_mode)
self.setup_variables(self.lists.keys() if variables is None else variables)
self.units = units if units else {}
Expand All @@ -267,18 +278,20 @@ def setup_variables(self, variables: Iterable[str]) -> None:
else:
# old style: topic is variable name
subscriptions.add(variable)
self.lists[variable] = []
self.tmp[variable] = []
with self.list_lock:
self.lists[variable] = []
self.tmp[variable] = []
self.subscribe(topics=subscriptions)

def reset_data_storage(self) -> None:
"""Reset the data storage."""
self.tmp = {}
self.lists = {}
with self.list_lock:
self.tmp = {}
self.lists = {}
self.last_datapoint = {}

def start_timer_trigger(self) -> None:
self.timer = RepeatingTimer(self.trigger_timeout, self.make_datapoint)
def start_timer_trigger(self, timeout: float) -> None:
self.timer = RepeatingTimer(timeout, self.make_datapoint)
self.timer.start()

def set_valuing_mode(self, valuing_mode: Optional[ValuingModes]) -> None:
Expand Down Expand Up @@ -313,8 +326,9 @@ def save_data(self, meta: Optional[dict] = None, suffix: str = "", header: str =
# 'user': self.user_data, # user stored meta data
})
try:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
with self.list_lock:
with open(f"{folder}/{file_name}.json", 'w') as file:
json.dump(obj=(header, self.lists, meta), fp=file)
except TypeError as exc:
log.exception("Some type error during saving occurred.", exc_info=exc)
raise
Expand Down Expand Up @@ -350,7 +364,8 @@ def get_configuration(self) -> dict[str, Any]:
config['valuing_mode'] = vm.value
config['value_repeating'] = self.value_repeating
# Header and Variables.
config['variables'] = list(self.lists.keys())
with self.list_lock:
config['variables'] = list(self.lists.keys())
config['units'] = self.units
# config['autoSave'] = self.actionAutoSave.isChecked()
return config
Expand All @@ -365,8 +380,9 @@ def get_last_save_name(self) -> Union[str, None]:

def get_list_length(self) -> int:
"""Return the length of the lists."""
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length
with self.list_lock:
length = len(self.lists[list(self.lists.keys())[0]]) if self.lists else 0
return length


def main() -> None:
Expand Down
32 changes: 23 additions & 9 deletions tests/management/test_data_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ def test_start_collecting_starts_timer(data_logger: DataLogger):
# arrange
data_logger.trigger_timeout = 1000
# act
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER)
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=500)
# assert
assert data_logger.timer.interval == 1000
assert data_logger.timer.interval == 500
# cleanup
data_logger.timer.cancel()


def test_start_collecting_starts_timer_even_second_time(data_logger: DataLogger):
"""Even a second time, without explicit trigger type, the timer should be started."""
# arrange
data_logger.trigger_timeout = 1000
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER) # first time, to set type
data_logger.trigger_timeout = 500
# first time, to set type
data_logger.start_collecting(trigger_type=TriggerTypes.TIMER, trigger_timeout=1000)
data_logger.stop_collecting()
assert not hasattr(data_logger, "timer") # no timer left
# act
Expand Down Expand Up @@ -211,6 +212,7 @@ def test_handle_subscription_data_triggers(data_logger: DataLogger):

def test_handle_subscription_data_without_list(data_logger: DataLogger,
caplog: pytest.LogCaptureFixture):
caplog.set_level(0)
data_logger.handle_subscription_data({'not_present': 42})
assert caplog.messages == ["Got value for 'not_present', but no list present."]

Expand All @@ -224,8 +226,7 @@ def test_set_publisher_name(data_logger: DataLogger):
class Test_start_timer_trigger:
@pytest.fixture
def data_logger_stt(self, data_logger: DataLogger):
data_logger.trigger_timeout = 1000
data_logger.start_timer_trigger()
data_logger.start_timer_trigger(1000)
yield data_logger
data_logger.timer.cancel()

Expand Down Expand Up @@ -342,10 +343,23 @@ def saved_file(self, data_logger_sd: DataLogger):
path = Path(data_logger_sd.directory) / data_logger_sd.last_save_name
return path.with_suffix(".json").read_text()

@pytest.mark.xfail(True, reason="Not yet date recognition implemented.")
def test_output(self, saved_file: str):
# TODO make date comparison work.
assert saved_file == """["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, {"units": {}, "today": "2023-11-27", "file_name": "2023_11_27T15_07_06", "logger_name": "DataLoggerN", "configuration": {"trigger": "variable", "triggerVariable": "test", "trigger_variable": "test", "valuing_mode": "mean", "valueRepeat": false, "value_repeating": false, "variables": "time test 2 N1.sender.var"}}]""" # noqa
today_string = self.today.isoformat()
assert saved_file == "".join(
(
"""["", {"time": [], "test": [], "2": [], "N1.sender.var": []}, """,
'''{"units": {}, "today": "''',
today_string,
'''", "file_name": "''',
self.file_name,
"""", "logger_name": "DataLoggerN", """,
""""configuration": {"trigger_type": "variable", "trigger_timeout": 10, """,
""""trigger_variable": "test", "valuing_mode": "average", """,
""""value_repeating": false, """,
""""variables": ["time", "test", "2", "N1.sender.var"], """,
""""units": {}}}]""",
)
)

def test_json_content(self, saved_file: str):
today_string = self.today.isoformat()
Expand Down
Loading