From 1724467145eea690623480e373c41db8baa8ec5a Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:35:07 +0200 Subject: [PATCH 1/5] Add extended data publisher --- pyleco/utils/extended_data_publisher.py | 68 +++++++++++++++ tests/utils/test_extended_data_publisher.py | 97 +++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 pyleco/utils/extended_data_publisher.py create mode 100644 tests/utils/test_extended_data_publisher.py diff --git a/pyleco/utils/extended_data_publisher.py b/pyleco/utils/extended_data_publisher.py new file mode 100644 index 00000000..172689cf --- /dev/null +++ b/pyleco/utils/extended_data_publisher.py @@ -0,0 +1,68 @@ +# +# This file is part of the PyLECO package. +# +# Copyright (c) 2023-2024 PyLECO Developers +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +from __future__ import annotations +from typing import Any, Callable, Generator, Union + +from ..core.message import Message, MessageTypes +from ..core.data_message import DataMessage +from ..json_utils.rpc_generator import RPCGenerator +from .data_publisher import DataPublisher + +class ExtendedDataPublisher(DataPublisher): + """A DataPublisher, which sends the data also via the control protocol.""" + + def __init__( + self, full_name: str, send_message_method: Callable[[Message], None], **kwargs + ) -> None: + super().__init__(full_name, **kwargs) + self.send_control_message = send_message_method + self.subscribers: set[bytes] = set() + self.rpc_generator = RPCGenerator() + + def register_subscriber(self, subscriber: Union[bytes, str]) -> None: + """Register a subscriber, that it may receive messages via data_protocol.""" + if isinstance(subscriber, str): + subscriber = subscriber.encode() + self.subscribers.add(subscriber) + + def convert_data_message_to_messages( + self, data_message: DataMessage, receivers: set[Union[bytes, str]], + ) -> Generator[Message, Any, Any]: + cid = data_message.conversation_id + data = self.rpc_generator.build_request_str(method="set_subscription_message") + for receiver in receivers: + yield Message( + receiver=receiver, + data=data, + conversation_id=cid, + additional_payload=data_message.payload, + message_type=MessageTypes.JSON, + ) + + def send_message(self, message: DataMessage) -> None: + super().send_message(message) + for msg in self.convert_data_message_to_messages(message, self.subscribers): + # ideas: change to ask and check, whether it succeeded, otherwise remove subscriber + self.send_control_message(msg) diff --git a/tests/utils/test_extended_data_publisher.py b/tests/utils/test_extended_data_publisher.py new file mode 100644 index 00000000..7f28aff1 --- /dev/null +++ b/tests/utils/test_extended_data_publisher.py @@ -0,0 +1,97 @@ +# +# This file is part of the PyLECO package. +# +# Copyright (c) 2023-2024 PyLECO Developers +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +import pytest + +from pyleco.test import FakeContext +from pyleco.utils.extended_data_publisher import ExtendedDataPublisher, Message, DataMessage, MessageTypes + + +CID = b"conversation_id;" + + +@pytest.fixture +def fake_send_message(): + global messages + messages = [] + def _fsm(message: Message): + global messages + messages.append(message) + return _fsm + + +@pytest.fixture +def publisher(fake_send_message) -> ExtendedDataPublisher: + publisher = ExtendedDataPublisher( + "fn", send_message_method=fake_send_message, + context=FakeContext(), # type: ignore + ) + return publisher + + +@pytest.fixture +def data_message() -> DataMessage: + return DataMessage( + topic="topic", conversation_id=CID, data=b"0", additional_payload=[b"1", b"2"] + ) + + +def test_register_subscribers(publisher: ExtendedDataPublisher): + # act + publisher.register_subscriber("abcdef") + assert b"abcdef" in publisher.subscribers + + publisher.register_subscriber(b"ghi") + assert b"ghi" in publisher.subscribers + + +@pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"})) +def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage): + msgs = publisher.convert_data_message_to_messages(data_message, receivers=receivers) + for rec, msg in zip(receivers, msgs, strict=True): + assert msg == Message( + receiver=rec, + data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"}, + conversation_id=CID, + message_type=MessageTypes.JSON, + additional_payload=data_message.payload, + ) + + +def test_send_message(publisher: ExtendedDataPublisher, data_message: DataMessage): + # arrange + publisher.register_subscriber("abc") + # act + publisher.send_message(data_message) + assert publisher.socket._s == [data_message.to_frames()] + global messages + assert messages == [ + Message( + "abc", + data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"}, + conversation_id=CID, + message_type=MessageTypes.JSON, + additional_payload=data_message.payload, + ) + ] From 48cea41b738458db961b3c61f311bfd050a69a3c Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:04:21 +0200 Subject: [PATCH 2/5] Add unregister to data publisher. --- pyleco/utils/extended_data_publisher.py | 10 ++++++++-- tests/utils/test_extended_data_publisher.py | 19 ++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/pyleco/utils/extended_data_publisher.py b/pyleco/utils/extended_data_publisher.py index 172689cf..aa601149 100644 --- a/pyleco/utils/extended_data_publisher.py +++ b/pyleco/utils/extended_data_publisher.py @@ -42,13 +42,19 @@ def __init__( self.rpc_generator = RPCGenerator() def register_subscriber(self, subscriber: Union[bytes, str]) -> None: - """Register a subscriber, that it may receive messages via data_protocol.""" + """Register a subscriber, that it may receive data messages via command protocol.""" if isinstance(subscriber, str): subscriber = subscriber.encode() self.subscribers.add(subscriber) + def unregister_subscriber(self, subscriber: Union[bytes, str]) -> None: + """Unregister a subscriber, that it may not receive data messages via command protocol.""" + if isinstance(subscriber, str): + subscriber = subscriber.encode() + self.subscribers.discard(subscriber) + def convert_data_message_to_messages( - self, data_message: DataMessage, receivers: set[Union[bytes, str]], + self, data_message: DataMessage, receivers: Union[set[Union[bytes, str]], set[bytes]], ) -> Generator[Message, Any, Any]: cid = data_message.conversation_id data = self.rpc_generator.build_request_str(method="set_subscription_message") diff --git a/tests/utils/test_extended_data_publisher.py b/tests/utils/test_extended_data_publisher.py index 7f28aff1..7110def9 100644 --- a/tests/utils/test_extended_data_publisher.py +++ b/tests/utils/test_extended_data_publisher.py @@ -25,7 +25,9 @@ import pytest from pyleco.test import FakeContext -from pyleco.utils.extended_data_publisher import ExtendedDataPublisher, Message, DataMessage, MessageTypes +from pyleco.core.message import Message, MessageTypes +from pyleco.core.data_message import DataMessage +from pyleco.utils.extended_data_publisher import ExtendedDataPublisher CID = b"conversation_id;" @@ -66,6 +68,21 @@ def test_register_subscribers(publisher: ExtendedDataPublisher): assert b"ghi" in publisher.subscribers +def test_unregister_subscribers(publisher: ExtendedDataPublisher): + # arrange + publisher.subscribers.add(b"abc") + publisher.subscribers.add(b"def") + # act + # str + publisher.unregister_subscriber("abc") + assert b"abc" not in publisher.subscribers + # bytes + publisher.unregister_subscriber(b"def") + assert b"def" not in publisher.subscribers + # assert that no error is raised at repeated unregistering + publisher.unregister_subscriber(b"def") + + @pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"})) def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage): msgs = publisher.convert_data_message_to_messages(data_message, receivers=receivers) From d5664e9baac0d30f6985daff888f499314b6da85 Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Tue, 9 Jul 2024 15:20:31 +0200 Subject: [PATCH 3/5] Make extended message handler subscribe via remote protocol --- pyleco/utils/extended_message_handler.py | 22 +++++- tests/utils/test_extended_data_publisher.py | 2 + tests/utils/test_extended_message_handler.py | 77 ++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/pyleco/utils/extended_message_handler.py b/pyleco/utils/extended_message_handler.py index 02817d16..08142233 100644 --- a/pyleco/utils/extended_message_handler.py +++ b/pyleco/utils/extended_message_handler.py @@ -25,7 +25,7 @@ from __future__ import annotations import json import pickle -from typing import Optional +from typing import Optional, Union import zmq @@ -59,6 +59,7 @@ def register_rpc_methods(self) -> None: self.register_rpc_method(self.subscribe) self.register_rpc_method(self.unsubscribe) self.register_rpc_method(self.unsubscribe_all) + self.register_rpc_method(self.set_subscription_message) def close(self) -> None: self.subscriber.close(1) @@ -126,3 +127,22 @@ def unsubscribe_all(self) -> None: """Unsubscribe from all subscriptions.""" while self._subscriptions: self.unsubscribe_single(self._subscriptions.pop()) + + # methods for data protocol via control protocol + def subscribe_via_control(self, topic: Union[bytes, str]) -> None: + """Subscribe to a topic via the control protocol.""" + self.ask_rpc(receiver=topic, method="register_subscriber") + + def unsubscribe_via_control(self, topic: Union[bytes, str]) -> None: + """Unsubscribe to a topic via the control protocol.""" + self.ask_rpc(receiver=topic, method="unregister_subscriber") + + def set_subscription_message(self) -> None: + """Set a subscription message as if it had been received via data protocol.""" + msg = self.current_message + dm = DataMessage( + topic=msg.sender, + conversation_id=msg.conversation_id, + additional_payload=msg.payload[1:], + ) + self.handle_subscription_message(dm) diff --git a/tests/utils/test_extended_data_publisher.py b/tests/utils/test_extended_data_publisher.py index 7110def9..fd9c88b7 100644 --- a/tests/utils/test_extended_data_publisher.py +++ b/tests/utils/test_extended_data_publisher.py @@ -101,7 +101,9 @@ def test_send_message(publisher: ExtendedDataPublisher, data_message: DataMessag publisher.register_subscriber("abc") # act publisher.send_message(data_message) + # assert that the data message is sent assert publisher.socket._s == [data_message.to_frames()] + # assert that the control message is sent global messages assert messages == [ Message( diff --git a/tests/utils/test_extended_message_handler.py b/tests/utils/test_extended_message_handler.py index c1db857d..793a93f8 100644 --- a/tests/utils/test_extended_message_handler.py +++ b/tests/utils/test_extended_message_handler.py @@ -29,11 +29,15 @@ import pytest from pyleco.core.data_message import DataMessage +from pyleco.core.message import Message, MessageTypes from pyleco.test import FakeContext, FakeSocket from pyleco.utils.events import SimpleEvent from pyleco.utils.extended_message_handler import ExtendedMessageHandler +CID = b"conversation_id;" + + @pytest.fixture def handler(): handler = ExtendedMessageHandler(name="handler", @@ -46,6 +50,13 @@ def handler(): return handler +@pytest.fixture() +def fake_cid_generation(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_generate_cid() -> bytes: + return CID + monkeypatch.setattr("pyleco.core.serialization.generate_conversation_id", fake_generate_cid) + + def test_read_subscription_message_calls_handle(handler: ExtendedMessageHandler): message = DataMessage("", data="[]") handler.subscriber._r = [message.to_frames()] # type: ignore @@ -129,3 +140,69 @@ def test_handle_unknown_message_type(self, handler_hfl: ExtendedMessageHandler): handler_hfl.handle_full_legacy_subscription_message( DataMessage("topic", data="", message_type=210) ) + + +def test_subscribe_via_command(handler: ExtendedMessageHandler, fake_cid_generation): + handler.socket._r = [ # type: ignore + Message( + "handler", + "topic", + {"jsonrpc": "2.0", "id": 1, "result": None}, + message_type=MessageTypes.JSON, + conversation_id=CID, + ).to_frames() + ] + handler.subscribe_via_control("topic") + assert Message.from_frames(*handler.socket._s[0]) == Message( # type: ignore + "topic", + "N1.handler", + {"jsonrpc": "2.0", "id": 1, "method": "register_subscriber"}, + message_type=MessageTypes.JSON, + conversation_id=CID, + ) + + +def test_unsubscribe_via_command(handler: ExtendedMessageHandler, fake_cid_generation): + handler.socket._r = [ # type: ignore + Message( + "handler", + "topic", + {"jsonrpc": "2.0", "id": 1, "result": None}, + message_type=MessageTypes.JSON, + conversation_id=CID, + ).to_frames() + ] + handler.unsubscribe_via_control("topic") + assert Message.from_frames(*handler.socket._s[0]) == Message( # type: ignore + "topic", + "N1.handler", + {"jsonrpc": "2.0", "id": 1, "method": "unregister_subscriber"}, + message_type=MessageTypes.JSON, + conversation_id=CID, + ) + + +@pytest.fixture +def data_message() -> DataMessage: + return DataMessage( + topic="topic", conversation_id=CID, data=b"0", additional_payload=[b"1", b"2"] + ) + + +def test_set_subscription_message(handler: ExtendedMessageHandler, data_message: DataMessage): + handler.current_message = Message( + "abc", + sender="topic", + data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"}, + conversation_id=CID, + message_type=MessageTypes.JSON, + additional_payload=data_message.payload, + ) + def store_data(data_message): + global _data + _data = data_message + + handler.handle_subscription_message = store_data # type: ignore + # act + handler.set_subscription_message() + assert _data == data_message From 63d7fc0cb6a5998a269b33ce19852f13a2c13011 Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Tue, 9 Jul 2024 15:20:51 +0200 Subject: [PATCH 4/5] The actor offers publishing via control protocol (+test) --- pyleco/actors/actor.py | 14 +++++++++-- tests/acceptance_tests/test_director_actor.py | 25 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pyleco/actors/actor.py b/pyleco/actors/actor.py index 046706e5..45aaf771 100644 --- a/pyleco/actors/actor.py +++ b/pyleco/actors/actor.py @@ -29,7 +29,7 @@ import zmq from ..utils.message_handler import MessageHandler -from ..utils.data_publisher import DataPublisher +from ..utils.extended_data_publisher import ExtendedDataPublisher as DataPublisher from ..utils.timers import RepeatingTimer @@ -101,7 +101,9 @@ def __init__( self.pipeL.connect(f"inproc://listenerPipe:{pipe_port}") self.timer = RepeatingTimer(interval=periodic_reading, function=self.queue_readout) - self.publisher = DataPublisher(full_name=name, log=self.root_logger) + self.publisher = DataPublisher( + full_name=name, log=self.root_logger, send_message_method=self.send_message + ) if auto_connect: self.connect(**auto_connect) @@ -118,6 +120,8 @@ def register_rpc_methods(self) -> None: self.register_rpc_method(self.set_polling_interval) self.register_rpc_method(self.connect) self.register_rpc_method(self.disconnect) + self.register_rpc_method(self.register_subscriber) + self.register_rpc_method(self.unregister_subscriber) def register_device_method(self, method: Callable) -> None: """Make a device method available via RPC. The method name is prefixed with `device.`.""" @@ -271,3 +275,9 @@ def call_action(self, action: str, args: Optional[Sequence] = None, for attr in path[:-1]: obj = getattr(obj, attr) return getattr(obj, path[-1])(*args, **kwargs) + + def register_subscriber(self): + self.publisher.register_subscriber(self.current_message.sender) + + def unregister_subscriber(self): + self.publisher.unregister_subscriber(self.current_message.sender) diff --git a/tests/acceptance_tests/test_director_actor.py b/tests/acceptance_tests/test_director_actor.py index 94c39315..7836071c 100644 --- a/tests/acceptance_tests/test_director_actor.py +++ b/tests/acceptance_tests/test_director_actor.py @@ -29,6 +29,7 @@ import pytest +from pyleco.core.message import MessageTypes from pyleco.coordinators.coordinator import Coordinator from pyleco.actors.actor import Actor from pyleco.directors.director import Director @@ -83,10 +84,14 @@ def binary_method_created(additional_payload: list[bytes]) -> tuple[None, list[b """Receive binary data and return it. Create binary method by registering it.""" return None, [additional_payload[0] * 2] + def publish(): + actor.publisher.send_data("super content") + actor.register_rpc_method(binary_method_manually) actor.register_binary_rpc_method( binary_method_created, accept_binary_input=True, return_binary_output=True ) + actor.register_rpc_method(publish) actor.connect() actor.rpc.method()(actor.device.triple) actor.register_device_method(actor.device.triple) @@ -165,3 +170,23 @@ def test_binary_data_transfer_created(director: Director): assert director.ask_rpc( method="binary_method_created", additional_payload=[b"123"], extract_additional_payload=True ) == (None, [b"123123"]) + + +def test_data_via_control_protocol(director: Director): + # act + director.ask_rpc("register_subscriber") + director.ask_rpc("publish") + + msg = director.communicator.read_message() + director.communicator.send( + receiver=director.actor, # type: ignore + data={"jsonrpc": "2.0", "id": 1, "result": None}, + conversation_id=msg.conversation_id, + message_type=MessageTypes.JSON, + ) + + # teardown + director.ask_rpc("unregister_subscriber") + + assert msg.data == {"jsonrpc": "2.0", "id": 1, "method": "set_subscription_message"} + assert msg.payload[1:] == [b'super content'] From 20a3001ea8e90f938feed4ec51f1290f30ac3d2c Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Tue, 9 Jul 2024 15:32:51 +0200 Subject: [PATCH 5/5] Fix linting and tests. --- .github/workflows/pyleco_CI.yml | 1 + tests/acceptance_tests/test_director_actor.py | 2 +- tests/utils/test_extended_data_publisher.py | 5 +++-- tests/utils/test_extended_message_handler.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pyleco_CI.yml b/.github/workflows/pyleco_CI.yml index 92ef0033..f94b21b3 100644 --- a/.github/workflows/pyleco_CI.yml +++ b/.github/workflows/pyleco_CI.yml @@ -29,6 +29,7 @@ jobs: - name: Lint with ruff uses: chartboost/ruff-action@v1 with: + version: 0.4.10 # ruff-action@v1 is broken in regard to ruff 0.5.0 args: --extend-select=E9,F63,F7,F82 --show-source - uses: ammaraskar/sphinx-problem-matcher@master - name: Generate docs diff --git a/tests/acceptance_tests/test_director_actor.py b/tests/acceptance_tests/test_director_actor.py index 7836071c..74ea7900 100644 --- a/tests/acceptance_tests/test_director_actor.py +++ b/tests/acceptance_tests/test_director_actor.py @@ -177,7 +177,7 @@ def test_data_via_control_protocol(director: Director): director.ask_rpc("register_subscriber") director.ask_rpc("publish") - msg = director.communicator.read_message() + msg = director.communicator.read_message(conversation_id=None) director.communicator.send( receiver=director.actor, # type: ignore data={"jsonrpc": "2.0", "id": 1, "result": None}, diff --git a/tests/utils/test_extended_data_publisher.py b/tests/utils/test_extended_data_publisher.py index fd9c88b7..61c7cc9d 100644 --- a/tests/utils/test_extended_data_publisher.py +++ b/tests/utils/test_extended_data_publisher.py @@ -31,7 +31,7 @@ CID = b"conversation_id;" - +messages = [] # for tests @pytest.fixture def fake_send_message(): @@ -86,7 +86,8 @@ def test_unregister_subscribers(publisher: ExtendedDataPublisher): @pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"})) def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage): msgs = publisher.convert_data_message_to_messages(data_message, receivers=receivers) - for rec, msg in zip(receivers, msgs, strict=True): + assert len(receivers) == len(list(msgs)), "The lengths of receivers and messages do not match." + for rec, msg in zip(receivers, msgs): assert msg == Message( receiver=rec, data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"}, diff --git a/tests/utils/test_extended_message_handler.py b/tests/utils/test_extended_message_handler.py index 793a93f8..291f883d 100644 --- a/tests/utils/test_extended_message_handler.py +++ b/tests/utils/test_extended_message_handler.py @@ -36,7 +36,7 @@ CID = b"conversation_id;" - +_data = None # for temporary storage @pytest.fixture def handler():