Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data protocol via control protocol #91

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pyleco_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
- name: Lint with ruff
uses: chartboost/ruff-action@v1
with:
version: 0.4.10 # ruff-action@v1 is broken in regard to ruff 0.5.0
args: --extend-select=E9,F63,F7,F82 --show-source
- uses: ammaraskar/sphinx-problem-matcher@master
- name: Generate docs
Expand Down
14 changes: 12 additions & 2 deletions pyleco/actors/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import zmq

from ..utils.message_handler import MessageHandler
from ..utils.data_publisher import DataPublisher
from ..utils.extended_data_publisher import ExtendedDataPublisher as DataPublisher
from ..utils.timers import RepeatingTimer


Expand Down Expand Up @@ -101,7 +101,9 @@ def __init__(
self.pipeL.connect(f"inproc://listenerPipe:{pipe_port}")

self.timer = RepeatingTimer(interval=periodic_reading, function=self.queue_readout)
self.publisher = DataPublisher(full_name=name, log=self.root_logger)
self.publisher = DataPublisher(
full_name=name, log=self.root_logger, send_message_method=self.send_message
)

if auto_connect:
self.connect(**auto_connect)
Expand All @@ -118,6 +120,8 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.set_polling_interval)
self.register_rpc_method(self.connect)
self.register_rpc_method(self.disconnect)
self.register_rpc_method(self.register_subscriber)
self.register_rpc_method(self.unregister_subscriber)

def register_device_method(self, method: Callable) -> None:
"""Make a device method available via RPC. The method name is prefixed with `device.`."""
Expand Down Expand Up @@ -271,3 +275,9 @@ def call_action(self, action: str, args: Optional[Sequence] = None,
for attr in path[:-1]:
obj = getattr(obj, attr)
return getattr(obj, path[-1])(*args, **kwargs)

def register_subscriber(self):
self.publisher.register_subscriber(self.current_message.sender)

def unregister_subscriber(self):
self.publisher.unregister_subscriber(self.current_message.sender)
74 changes: 74 additions & 0 deletions pyleco/utils/extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

from __future__ import annotations
from typing import Any, Callable, Generator, Union

from ..core.message import Message, MessageTypes
from ..core.data_message import DataMessage
from ..json_utils.rpc_generator import RPCGenerator
from .data_publisher import DataPublisher

class ExtendedDataPublisher(DataPublisher):
"""A DataPublisher, which sends the data also via the control protocol."""

def __init__(
self, full_name: str, send_message_method: Callable[[Message], None], **kwargs
) -> None:
super().__init__(full_name, **kwargs)
self.send_control_message = send_message_method
self.subscribers: set[bytes] = set()
self.rpc_generator = RPCGenerator()

def register_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Register a subscriber, that it may receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.add(subscriber)

def unregister_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Unregister a subscriber, that it may not receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.discard(subscriber)

def convert_data_message_to_messages(
self, data_message: DataMessage, receivers: Union[set[Union[bytes, str]], set[bytes]],
) -> Generator[Message, Any, Any]:
cid = data_message.conversation_id
data = self.rpc_generator.build_request_str(method="set_subscription_message")
for receiver in receivers:
yield Message(
receiver=receiver,
data=data,
conversation_id=cid,
additional_payload=data_message.payload,
message_type=MessageTypes.JSON,
)

def send_message(self, message: DataMessage) -> None:
super().send_message(message)
for msg in self.convert_data_message_to_messages(message, self.subscribers):
# ideas: change to ask and check, whether it succeeded, otherwise remove subscriber
self.send_control_message(msg)
22 changes: 21 additions & 1 deletion pyleco/utils/extended_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from __future__ import annotations
import json
import pickle
from typing import Optional
from typing import Optional, Union

import zmq

Expand Down Expand Up @@ -59,6 +59,7 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.subscribe)
self.register_rpc_method(self.unsubscribe)
self.register_rpc_method(self.unsubscribe_all)
self.register_rpc_method(self.set_subscription_message)

def close(self) -> None:
self.subscriber.close(1)
Expand Down Expand Up @@ -126,3 +127,22 @@ def unsubscribe_all(self) -> None:
"""Unsubscribe from all subscriptions."""
while self._subscriptions:
self.unsubscribe_single(self._subscriptions.pop())

# methods for data protocol via control protocol
def subscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Subscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="register_subscriber")

def unsubscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Unsubscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="unregister_subscriber")

def set_subscription_message(self) -> None:
"""Set a subscription message as if it had been received via data protocol."""
msg = self.current_message
dm = DataMessage(
topic=msg.sender,
conversation_id=msg.conversation_id,
additional_payload=msg.payload[1:],
)
self.handle_subscription_message(dm)
25 changes: 25 additions & 0 deletions tests/acceptance_tests/test_director_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import pytest

from pyleco.core.message import MessageTypes
from pyleco.coordinators.coordinator import Coordinator
from pyleco.actors.actor import Actor
from pyleco.directors.director import Director
Expand Down Expand Up @@ -83,10 +84,14 @@ def binary_method_created(additional_payload: list[bytes]) -> tuple[None, list[b
"""Receive binary data and return it. Create binary method by registering it."""
return None, [additional_payload[0] * 2]

def publish():
actor.publisher.send_data("super content")

actor.register_rpc_method(binary_method_manually)
actor.register_binary_rpc_method(
binary_method_created, accept_binary_input=True, return_binary_output=True
)
actor.register_rpc_method(publish)
actor.connect()
actor.rpc.method()(actor.device.triple)
actor.register_device_method(actor.device.triple)
Expand Down Expand Up @@ -165,3 +170,23 @@ def test_binary_data_transfer_created(director: Director):
assert director.ask_rpc(
method="binary_method_created", additional_payload=[b"123"], extract_additional_payload=True
) == (None, [b"123123"])


def test_data_via_control_protocol(director: Director):
# act
director.ask_rpc("register_subscriber")
director.ask_rpc("publish")

msg = director.communicator.read_message(conversation_id=None)
director.communicator.send(
receiver=director.actor, # type: ignore
data={"jsonrpc": "2.0", "id": 1, "result": None},
conversation_id=msg.conversation_id,
message_type=MessageTypes.JSON,
)

# teardown
director.ask_rpc("unregister_subscriber")

assert msg.data == {"jsonrpc": "2.0", "id": 1, "method": "set_subscription_message"}
assert msg.payload[1:] == [b'super content']
117 changes: 117 additions & 0 deletions tests/utils/test_extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

import pytest

from pyleco.test import FakeContext
from pyleco.core.message import Message, MessageTypes
from pyleco.core.data_message import DataMessage
from pyleco.utils.extended_data_publisher import ExtendedDataPublisher


CID = b"conversation_id;"
messages = [] # for tests

@pytest.fixture
def fake_send_message():
global messages
messages = []
def _fsm(message: Message):
global messages
messages.append(message)
return _fsm


@pytest.fixture
def publisher(fake_send_message) -> ExtendedDataPublisher:
publisher = ExtendedDataPublisher(
"fn", send_message_method=fake_send_message,
context=FakeContext(), # type: ignore
)
return publisher


@pytest.fixture
def data_message() -> DataMessage:
return DataMessage(
topic="topic", conversation_id=CID, data=b"0", additional_payload=[b"1", b"2"]
)


def test_register_subscribers(publisher: ExtendedDataPublisher):
# act
publisher.register_subscriber("abcdef")
assert b"abcdef" in publisher.subscribers

publisher.register_subscriber(b"ghi")
assert b"ghi" in publisher.subscribers


def test_unregister_subscribers(publisher: ExtendedDataPublisher):
# arrange
publisher.subscribers.add(b"abc")
publisher.subscribers.add(b"def")
# act
# str
publisher.unregister_subscriber("abc")
assert b"abc" not in publisher.subscribers
# bytes
publisher.unregister_subscriber(b"def")
assert b"def" not in publisher.subscribers
# assert that no error is raised at repeated unregistering
publisher.unregister_subscriber(b"def")


@pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"}))
def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage):
msgs = publisher.convert_data_message_to_messages(data_message, receivers=receivers)
assert len(receivers) == len(list(msgs)), "The lengths of receivers and messages do not match."
for rec, msg in zip(receivers, msgs):
assert msg == Message(
receiver=rec,
data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"},
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)


def test_send_message(publisher: ExtendedDataPublisher, data_message: DataMessage):
# arrange
publisher.register_subscriber("abc")
# act
publisher.send_message(data_message)
# assert that the data message is sent
assert publisher.socket._s == [data_message.to_frames()]
# assert that the control message is sent
global messages
assert messages == [
Message(
"abc",
data={"id": 1, "method": "set_subscription_message", "jsonrpc": "2.0"},
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)
]
Loading