Skip to content

Commit

Permalink
Merge pull request #81 from pymeasure/expand-test-coverage
Browse files Browse the repository at this point in the history
Expand test coverage
  • Loading branch information
BenediktBurger committed May 16, 2024
2 parents 96f5e98 + a88271a commit 4b3d071
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 32 deletions.
52 changes: 24 additions & 28 deletions pyleco/management/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@
# THE SOFTWARE.
#

"""
The starter starts scripts (containing devices) and runs them.
For command line arguments, execute the module with `--help` parameter.
Tasks have to be PyQtObjects with name "Task" in a file called with the taskname
in the folder "tasks" or any other folder given with `directory`.
E.g. in "tasks/test1.py" for task "test1".
Created on Thu Dec 15 09:31:04 2022 by Benedikt Moneke
"""

from __future__ import annotations
from enum import IntFlag
from importlib import import_module, reload
Expand All @@ -59,8 +47,9 @@
modules: dict[str, Any] = {} # A dictionary of the task modules


def sanitize_tasks(tasks: Optional[Union[list[str], tuple[str, ...], str]]
) -> Union[tuple[str, ...], list[str]]:
def sanitize_tasks(
tasks: Optional[Union[list[str], tuple[str, ...], str]],
) -> Union[tuple[str, ...], list[str]]:
"""Ensure that the tasks are a list of tasks."""
if tasks is None:
return ()
Expand Down Expand Up @@ -108,8 +97,13 @@ class Starter(MessageHandler):
:param tasks: List of task names to execute on startup.
"""

def __init__(self, name: str = "starter", directory: Optional[str] = None,
tasks: Optional[list[str]] = None, **kwargs) -> None:
def __init__(
self,
name: str = "starter",
directory: Optional[str] = None,
tasks: Optional[list[str]] = None,
**kwargs,
) -> None:
super().__init__(name=name, **kwargs)
self.threads: dict[str, threading.Thread] = {} # List of threads
self.events: dict[str, threading.Event] = {} # Events to stop the threads.
Expand Down Expand Up @@ -189,9 +183,9 @@ def start_task(self, name: str) -> None:
return
self.events[name] = threading.Event()
try:
self.threads[name] = thread = threading.Thread(target=script.task,
args=(self.events[name],),
daemon=True)
self.threads[name] = thread = threading.Thread(
target=script.task, args=(self.events[name],), daemon=True
)
except Exception as exc:
log.exception(f"Creation of task '{name}' failed.", exc_info=exc)
return
Expand Down Expand Up @@ -272,10 +266,10 @@ def list_tasks(self) -> list[dict[str, str]]:
with open(f"{self.directory}/{name}", "r") as file:
# Search for the first line with triple quotes
i = 0
while not file.readline().strip() == '\"\"\"' and i < 10:
while not file.readline().strip() == '"""' and i < 10:
i += 1
tooltip = file.readline() # first line after line with triple quotes
tasks.append({'name': name.replace(".py", ""), 'tooltip': tooltip})
tasks.append({"name": name.replace(".py", ""), "tooltip": tooltip})
log.debug(f"Tasks found: {tasks}.")
return tasks

Expand All @@ -289,19 +283,21 @@ def check_installed_tasks(self) -> None:


def main() -> None:
parser.add_argument("tasks", nargs="*",
help="Tasks to execute at startup.")
parser.add_argument("-d", "--directory",
help="set the directory to search for tasks, do not add a trailing slash")
parser.add_argument("tasks", nargs="*", help="Tasks to execute at startup.")
parser.add_argument(
"-d",
"--directory",
help="set the directory to search for tasks, do not add a trailing slash",
)

gLog = logging.getLogger() # print all log entries!
if not gLog.handlers:
handler = logging.StreamHandler()
handler.setFormatter(StrFormatter)
gLog.addHandler(handler)
kwargs = parse_command_line_parameters(parser=parser,
parser_description="Start tasks as required.",
logger=gLog)
kwargs = parse_command_line_parameters(
parser=parser, parser_description="Start tasks as required.", logger=gLog
)

starter = Starter(log=gLog, **kwargs)
starter.listen()
Expand Down
26 changes: 26 additions & 0 deletions tests/directors/test_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ def test_default_actor(self, director: Director):
assert director._actor_check("") == "actor"


def test_get_rpc_capabilities(director: Director):
data = {"name": "actor", "methods": []}
director.communicator._r = [ # type: ignore
Message("director", "actor", conversation_id=cid, message_type=MessageTypes.JSON, data={
"id": 1, "result": data, "jsonrpc": "2.0"
})]
result = director.get_rpc_capabilities()
assert director.communicator._s == [ # type: ignore
Message("actor", "director", conversation_id=cid, message_type=MessageTypes.JSON, data={
"id": 1, "method": "rpc.discover", "jsonrpc": "2.0"
})]
assert result == data


def test_shutdown_actor(director: Director):
director.communicator._r = [ # type: ignore
Message("director", "actor", conversation_id=cid, message_type=MessageTypes.JSON, data={
Expand All @@ -89,6 +103,18 @@ def test_shutdown_actor(director: Director):
})]


def test_set_actor_log_level(director: Director):
director.communicator._r = [ # type: ignore
Message("director", "actor", conversation_id=cid, message_type=MessageTypes.JSON, data={
"id": 1, "result": None, "jsonrpc": "2.0"
})]
director.set_actor_log_level(30)
assert director.communicator._s == [ # type: ignore
Message("actor", "director", conversation_id=cid, message_type=MessageTypes.JSON, data={
"id": 1, "method": "set_log_level", "jsonrpc": "2.0", "params": {"level": "WARNING"}
})]


def test_read_rpc_response(director: Director):
director.communicator._r = [ # type: ignore
Message("director", "actor", conversation_id=cid, message_type=MessageTypes.JSON, data={
Expand Down
84 changes: 82 additions & 2 deletions tests/management/test_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
#

from __future__ import annotations
from unittest.mock import MagicMock
from unittest.mock import call, MagicMock

import pytest

from pyleco.test import FakeContext
from pyleco.management.starter import Starter, Status
from pyleco.management.starter import sanitize_tasks, Starter, Status
from pyleco.utils.events import SimpleEvent


@pytest.fixture
Expand All @@ -54,6 +55,14 @@ def join(self, timeout=None) -> None:
return


@pytest.mark.parametrize("tasks", (None, [], (), "string", ["abc", "def"]))
def test_sanitize_tasks(tasks):
sanitized = sanitize_tasks(tasks)
assert isinstance(sanitized, (tuple, list))
for t in sanitized:
assert isinstance(t, str)


def test_init(starter: Starter):
assert starter.started_tasks == {}
assert starter.threads == {}
Expand All @@ -73,6 +82,12 @@ def test_install_task(starter: Starter, pre: Status, post: Status):
assert starter.started_tasks["test"] == post


def test_install_tasks(starter: Starter):
starter.install_task = MagicMock() # type: ignore[method-assign]
starter.install_tasks(["a", "b"])
assert starter.install_task.call_args_list == [call("a"), call("b")]


@pytest.mark.parametrize("pre, post", (
(Status.RUNNING | Status.INSTALLED, Status.RUNNING),
(None, Status.STOPPED), # not yet in the dict
Expand All @@ -86,6 +101,12 @@ def test_uninstall_task(starter: Starter, pre: Status, post: Status):
assert starter.started_tasks.get("test") == post


def test_uninstall_tasks(starter: Starter):
starter.uninstall_task = MagicMock() # type: ignore[method-assign]
starter.uninstall_tasks(["a", "b"])
assert starter.uninstall_task.call_args_list == [call("a"), call("b")]


class Test_status_tasks:
@pytest.fixture
def status(self, starter: Starter) -> dict[str, Status]:
Expand Down Expand Up @@ -138,3 +159,62 @@ def starter_cit(self, starter: Starter) -> Starter:
def test_start_installed_but_not_running_task(self, starter_cit: Starter):
"""Test, that only the installed (and not running) task is started."""
starter_cit.start_task.assert_called_once_with("INR") # type: ignore[attr-defined]


class Test_start_task:
def test_already_started_task(self, starter: Starter):
# arrange
starter.started_tasks["t1"] = Status.STARTED
starter.threads["t1"] = FakeThread(alive=True) # type: ignore
starter.events["t1"] = SimpleEvent() # type: ignore
# act
starter.start_task("t1")
assert Status.RUNNING in Status(starter.started_tasks["t1"])


def test_start_tasks(starter: Starter):
starter.start_task = MagicMock() # type: ignore[method-assign]
starter.start_tasks(["a", "b"])
assert starter.start_task.call_args_list == [call("a"), call("b")]


class Test_stop_task:
def test_stop_not_existing_task(self, starter: Starter):
starter.stop_task("whatever")

def test_stop_existing_running_task(self, starter: Starter):
# arrange
starter.started_tasks["t1"] = Status.STARTED
starter.threads["t1"] = FakeThread(alive=True) # type: ignore
event = starter.events["t1"] = SimpleEvent() # type: ignore
# act
starter.stop_task("t1")
assert "t1" not in starter.threads
assert "t1" not in starter.started_tasks
assert event.is_set() is True

def test_stop_removed_task(self, starter: Starter):
# arrange
try:
del starter.threads["t1"]
except KeyError:
pass
starter.started_tasks["t1"] = Status.STARTED
# act
starter.stop_task("t1")
assert "t1" not in starter.threads
assert "t1" not in starter.started_tasks


def test_stop_tasks(starter: Starter):
starter.stop_task = MagicMock() # type: ignore[method-assign]
starter.stop_tasks(["a", "b"])
assert starter.stop_task.call_args_list == [call("a"), call("b")]


def test_restart_tasks(starter: Starter):
starter.start_task = MagicMock() # type: ignore[method-assign]
starter.stop_task = MagicMock() # type: ignore[method-assign]
starter.restart_tasks(["a", "b"])
assert starter.stop_task.call_args_list == [call("a"), call("b")]
assert starter.start_task.call_args_list == [call("a"), call("b")]
55 changes: 53 additions & 2 deletions tests/test_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,59 @@

import pytest

from pyleco.test import FakePoller, FakeSocket
from pyleco.test import FakeCommunicator, FakePoller, FakeSocket


@pytest.fixture
def poller():
def poller() -> FakePoller:
return FakePoller()


@pytest.fixture
def socket() -> FakeSocket:
return FakeSocket(1)


@pytest.fixture
def sub_socket() -> FakeSocket:
return FakeSocket(2)


def test_socket_unbind(socket: FakeSocket):
socket.bind("abc")
socket.unbind()
assert socket.addr is None


def test_socket_disconnect(socket: FakeSocket):
socket.connect("abc")
socket.disconnect()
assert socket.addr is None


@pytest.mark.parametrize("topic", ("string", b"bytes"))
def test_socket_subscribe(sub_socket: FakeSocket, topic):
sub_socket.subscribe(topic)
assert isinstance(sub_socket._subscriptions[-1], bytes)


def test_subscribe_fails_for_not_SUB(socket: FakeSocket):
with pytest.raises(ValueError):
socket.subscribe("abc")


@pytest.mark.parametrize("topic", ("topic", b"topic"))
def test_socket_unsubscribe(sub_socket: FakeSocket, topic):
sub_socket._subscriptions.append(b"topic")
sub_socket.unsubscribe(topic)
assert b"topic" not in sub_socket._subscriptions


def test_unsubscribe_fails_for_not_SUB(socket: FakeSocket):
with pytest.raises(ValueError):
socket.unsubscribe("abc")


class Test_FakePoller_unregister:
def test_no_error_at_missing(self, poller: FakePoller):
poller.unregister(FakeSocket(1))
Expand All @@ -42,3 +87,9 @@ def test_unregister_removes_socket(self, poller: FakePoller):
poller._sockets = [1, 2, socket, 4, 5] # type: ignore
poller.unregister(socket)
assert socket not in poller._sockets


def test_FakeCommunicator_sign_in():
fc = FakeCommunicator("")
fc.sign_in()
assert fc._signed_in is True
38 changes: 38 additions & 0 deletions tests/utils/test_extended_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# THE SOFTWARE.
#

import json
import pickle
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -52,6 +54,15 @@ def test_read_subscription_message_calls_handle(handler: ExtendedMessageHandler)
handler.handle_subscription_message.assert_called_once_with(message) # type: ignore


def test_read_subscription_message_calls_handle_legacy(handler: ExtendedMessageHandler):
message = DataMessage("", data="[]", message_type=234)
handler.handle_full_legacy_subscription_message = MagicMock() # type: ignore[method-assign]
handler.subscriber._r = [message.to_frames()] # type: ignore
handler.read_subscription_message()
# assert
handler.handle_full_legacy_subscription_message.assert_called_once_with(message) # type: ignore


def test_subscribe_single(handler: ExtendedMessageHandler):
handler.subscribe_single(b"topic")
assert handler.subscriber._subscriptions == [b"topic"] # type: ignore
Expand Down Expand Up @@ -91,3 +102,30 @@ def test_unsubscribe_all(handler: ExtendedMessageHandler):
handler._subscriptions = [b"topic1", b"topic2"]
handler.unsubscribe_all()
assert handler._subscriptions == []


class Test_handle_full_legacy_subscription_message:
@pytest.fixture
def handler_hfl(self, handler: ExtendedMessageHandler) -> ExtendedMessageHandler:
handler.handle_subscription_data = MagicMock() # type: ignore[method-assign]
return handler

def test_handle_pickled_message(self, handler_hfl: ExtendedMessageHandler):
data = ["some", "data", 5]
handler_hfl.handle_full_legacy_subscription_message(
DataMessage("topic", data=pickle.dumps(data), message_type=234)
)
handler_hfl.handle_subscription_data.assert_called_once_with({"topic": data}) # type: ignore

def test_handle_json_message(self, handler_hfl: ExtendedMessageHandler):
data = ["some", "data", 5]
handler_hfl.handle_full_legacy_subscription_message(
DataMessage("topic", data=json.dumps(data), message_type=235)
)
handler_hfl.handle_subscription_data.assert_called_once_with({"topic": data}) # type: ignore

def test_handle_unknown_message_type(self, handler_hfl: ExtendedMessageHandler):
with pytest.raises(ValueError):
handler_hfl.handle_full_legacy_subscription_message(
DataMessage("topic", data="", message_type=210)
)

0 comments on commit 4b3d071

Please sign in to comment.