Skip to content

Add data protocol via control protocol #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pyleco/actors/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import zmq

from ..utils.message_handler import MessageHandler
from ..utils.data_publisher import DataPublisher
from ..utils.extended_data_publisher import ExtendedDataPublisher as DataPublisher
from ..utils.timers import RepeatingTimer


Expand Down Expand Up @@ -101,7 +101,9 @@ def __init__(
self.pipeL.connect(f"inproc://listenerPipe:{pipe_port}")

self.timer = RepeatingTimer(interval=periodic_reading, function=self.queue_readout)
self.publisher = DataPublisher(full_name=name, log=self.root_logger)
self.publisher = DataPublisher(
full_name=name, log=self.root_logger, send_message_method=self.send_message
)

if auto_connect:
self.connect(**auto_connect)
Expand All @@ -118,6 +120,8 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.set_polling_interval)
self.register_rpc_method(self.connect)
self.register_rpc_method(self.disconnect)
self.register_rpc_method(self.register_subscriber)
self.register_rpc_method(self.unregister_subscriber)

def register_device_method(self, method: Callable) -> None:
"""Make a device method available via RPC. The method name is prefixed with `device.`."""
Expand Down Expand Up @@ -271,3 +275,9 @@ def call_action(self, action: str, args: Optional[Sequence] = None,
for attr in path[:-1]:
obj = getattr(obj, attr)
return getattr(obj, path[-1])(*args, **kwargs)

def register_subscriber(self):
self.publisher.register_subscriber(self.current_message.sender)

def unregister_subscriber(self):
self.publisher.unregister_subscriber(self.current_message.sender)
2 changes: 2 additions & 0 deletions pyleco/json_utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
class JSONRPCError(Exception):
"""Base error that all JSON RPC exceptions extend."""

rpc_error: ErrorType

def __init__(self, error: ErrorType) -> None:
msg = f"{error.code}: {error.message}"
self.rpc_error = error
Expand Down
105 changes: 105 additions & 0 deletions pyleco/utils/extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Callable, Generator, Union

from ..json_utils.errors import JSONRPCError, NODE_UNKNOWN, RECEIVER_UNKNOWN
from ..json_utils.json_objects import Notification
from ..core.message import Message, MessageTypes
from ..core.data_message import DataMessage
from ..json_utils.rpc_generator import RPCGenerator
from .data_publisher import DataPublisher

class ExtendedDataPublisher(DataPublisher):
"""A DataPublisher, which sends the data also via the control protocol."""

def __init__(
self, full_name: str, send_message_method: Callable[[Message], None], **kwargs
) -> None:
super().__init__(full_name, **kwargs)
self.send_control_message = send_message_method
self.subscribers: set[bytes] = set()
self.rpc_generator = RPCGenerator()

def register_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Register a subscriber, that it may receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.add(subscriber)

def unregister_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Unregister a subscriber, that it may not receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.discard(subscriber)

def convert_data_message_to_messages(
self, data_message: DataMessage, receivers: Union[set[Union[bytes, str]], set[bytes]],
) -> Generator[Message, Any, Any]:
cid = data_message.conversation_id
for receiver in receivers:
yield Message(
receiver=receiver,
data=Notification("add_subscription_message"),
conversation_id=cid,
additional_payload=data_message.payload,
message_type=MessageTypes.JSON,
)

def send_message(self, message: DataMessage) -> None:
super().send_message(message)
for msg in self.convert_data_message_to_messages(message, self.subscribers):
# ideas: change to ask and check, whether it succeeded, otherwise remove subscriber
self.send_control_message(msg)

# TODO should unregister subscribers to which a message could not be forwarded
def handle_json_error(self, message: Message) -> None:
"""Unregister unavailable subscribers.

Call this method from the message handler, for example.
"""
try:
data: dict[str, Any] = message.data # type: ignore
except JSONDecodeError as exc:
self.log.exception(f"Could not decode json message {message}", exc_info=exc)
return
try:
self.rpc_generator.get_result_from_response(data)
except JSONRPCError as exc:
error_code = exc.rpc_error.code
try:
error_data = exc.rpc_error.data # type: ignore
except AttributeError:
return
if error_code == RECEIVER_UNKNOWN:
self.unregister_subscriber(error_data)
if error_code == NODE_UNKNOWN:
if isinstance(error_data, str):
error_data = error_data.encode()
for subscriber in self.subscribers:
if subscriber.startswith(error_data):
self.unregister_subscriber(subscriber)

22 changes: 21 additions & 1 deletion pyleco/utils/extended_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from __future__ import annotations
import json
import pickle
from typing import Optional
from typing import Optional, Union

import zmq

Expand Down Expand Up @@ -59,6 +59,7 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.subscribe)
self.register_rpc_method(self.unsubscribe)
self.register_rpc_method(self.unsubscribe_all)
self.register_rpc_method(self.add_subscription_message)

def close(self) -> None:
self.subscriber.close(1)
Expand Down Expand Up @@ -126,3 +127,22 @@ def unsubscribe_all(self) -> None:
"""Unsubscribe from all subscriptions."""
while self._subscriptions:
self.unsubscribe_single(self._subscriptions.pop())

# methods for data protocol via control protocol
def subscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Subscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="register_subscriber")

def unsubscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Unsubscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="unregister_subscriber")

def add_subscription_message(self) -> None:
"""Set a subscription message as if it had been received via data protocol."""
msg = self.current_message
dm = DataMessage(
topic=msg.sender,
conversation_id=msg.conversation_id,
additional_payload=msg.payload[1:],
)
self.handle_subscription_message(dm)
2 changes: 1 addition & 1 deletion pyleco/utils/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def namespace(self) -> Union[str, None]:
return self._namespace

@namespace.setter
def namespace(self, value: Union[str, None]) -> None:
def namespace(self, value: Union[str, None]) -> None: # type: ignore
self._namespace = value
full_name = self.name if value is None else ".".join((value, self.name))
self.set_full_name(full_name=full_name)
Expand Down
18 changes: 18 additions & 0 deletions tests/acceptance_tests/test_director_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ def binary_method_created(additional_payload: list[bytes]) -> tuple[None, list[b
"""Receive binary data and return it. Create binary method by registering it."""
return None, [additional_payload[0] * 2]

def publish():
actor.publisher.send_data("super content")

actor.register_rpc_method(binary_method_manually)
actor.register_binary_rpc_method(
binary_method_created, accept_binary_input=True, return_binary_output=True
)
actor.register_rpc_method(publish)
actor.connect()
actor.rpc.method()(actor.device.triple)
actor.register_device_method(actor.device.triple)
Expand Down Expand Up @@ -165,3 +169,17 @@ def test_binary_data_transfer_created(director: Director):
assert director.ask_rpc(
method="binary_method_created", additional_payload=[b"123"], extract_additional_payload=True
) == (None, [b"123123"])


def test_data_via_control_protocol(director: Director):
# act
director.ask_rpc("register_subscriber")
director.ask_rpc("publish")

msg = director.communicator.read_message(conversation_id=None)

# teardown
director.ask_rpc("unregister_subscriber")

assert msg.data == {"jsonrpc": "2.0", "method": "add_subscription_message"}
assert msg.payload[1:] == [b'super content']
118 changes: 118 additions & 0 deletions tests/utils/test_extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

import pytest

from pyleco.test import FakeContext
from pyleco.core.message import Message, MessageTypes
from pyleco.core.data_message import DataMessage
from pyleco.json_utils.json_objects import Notification
from pyleco.utils.extended_data_publisher import ExtendedDataPublisher


CID = b"conversation_id;"
messages = [] # for tests

@pytest.fixture
def fake_send_message():
global messages
messages = []
def _fsm(message: Message):
global messages
messages.append(message)
return _fsm


@pytest.fixture
def publisher(fake_send_message) -> ExtendedDataPublisher:
publisher = ExtendedDataPublisher(
"fn", send_message_method=fake_send_message,
context=FakeContext(), # type: ignore
)
return publisher


@pytest.fixture
def data_message() -> DataMessage:
return DataMessage(
topic="topic", conversation_id=CID, data=b"0", additional_payload=[b"1", b"2"]
)


def test_register_subscribers(publisher: ExtendedDataPublisher):
# act
publisher.register_subscriber("abcdef")
assert b"abcdef" in publisher.subscribers

publisher.register_subscriber(b"ghi")
assert b"ghi" in publisher.subscribers


def test_unregister_subscribers(publisher: ExtendedDataPublisher):
# arrange
publisher.subscribers.add(b"abc")
publisher.subscribers.add(b"def")
# act
# str
publisher.unregister_subscriber("abc")
assert b"abc" not in publisher.subscribers
# bytes
publisher.unregister_subscriber(b"def")
assert b"def" not in publisher.subscribers
# assert that no error is raised at repeated unregistering
publisher.unregister_subscriber(b"def")


@pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"}))
def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage):
msgs = list(publisher.convert_data_message_to_messages(data_message, receivers=receivers))
assert len(msgs) == len(receivers)
for rec, msg in zip(receivers, msgs):
assert msg == Message(
receiver=rec,
data=Notification(method="add_subscription_message"),
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)


def test_send_message(publisher: ExtendedDataPublisher, data_message: DataMessage):
# arrange
publisher.register_subscriber("abc")
# act
publisher.send_message(data_message)
# assert that the data message is sent
assert publisher.socket._s == [data_message.to_frames()]
# assert that the control message is sent
global messages
assert messages == [
Message(
"abc",
data=Notification(method="add_subscription_message"),
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)
]
Loading
Loading