Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary payload handling to PyLECO #82

Merged
merged 22 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6ec8873
Make communicator return binary objects.
BenediktBurger May 22, 2024
4a49863
Make MessageHandler capable for binary objects.
BenediktBurger May 22, 2024
055d0fa
Add additional_payload to Message.
BenediktBurger May 22, 2024
0db80a1
Refactor Director.ask_message
BenediktBurger May 22, 2024
df75c6a
Modify formatting with ruff
BenediktBurger May 27, 2024
7bc5c87
Add parameter for extract binary response
BenediktBurger May 27, 2024
baa5797
Add additional_payload option to ask_rpc
BenediktBurger May 27, 2024
0bbe03e
Add additional payload options to director
BenediktBurger May 27, 2024
17c9e2a
Add acceptance test for binary transfer.
BenediktBurger May 27, 2024
a32402a
Return all additional payload frames.
BenediktBurger May 27, 2024
ad66400
Update Director (and Fake) to changes.
BenediktBurger May 27, 2024
d970d39
Tiny changes.
BenediktBurger May 27, 2024
cb62581
Return either json value or json and binary
BenediktBurger May 29, 2024
d2d5ad8
Add a method to register binary methods.
BenediktBurger May 29, 2024
c53d71c
Modify docstring of binary method.
BenediktBurger Jun 1, 2024
6ba7917
Explicitly state whether to return binary values.
BenediktBurger Jun 4, 2024
453db24
State type of binary method in docstring.
BenediktBurger Jun 5, 2024
ef51d47
Improve documentation
BenediktBurger Jun 11, 2024
39ca4c5
Make data_message similar to message
BenediktBurger Jun 11, 2024
8e3d82e
Add binary sending to data publisher
BenediktBurger Jun 11, 2024
bbd1c53
Fix creation of binary method
BenediktBurger Jun 12, 2024
6ba4c19
Add changelog entry.
BenediktBurger Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG

## [unreleased]

### Added

* Add convenience functions for using additional frames for binary payload ([#82](https://github.com/pymeasure/pyleco/pull/82))


## [0.3.2] 2024-5-07

### Fixed
Expand Down
8 changes: 5 additions & 3 deletions pyleco/core/data_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union

from .serialization import deserialize_data, generate_conversation_id, serialize_data, MessageTypes

Expand All @@ -42,6 +42,7 @@ def __init__(self,
data: Optional[Union[bytes, str, Any]] = None,
conversation_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs) -> None:
super().__init__(**kwargs)
self.topic = topic.encode() if isinstance(topic, str) else topic
Expand All @@ -61,6 +62,8 @@ def __init__(self,
self.payload = []
else:
self.payload = [serialize_data(data)]
if additional_payload is not None:
self.payload.extend(additional_payload)

@classmethod
def from_frames(cls, topic: bytes, header: bytes, *payload: bytes):
Expand All @@ -71,8 +74,7 @@ def from_frames(cls, topic: bytes, header: bytes, *payload: bytes):
frames = socket.recv_multipart()
message = DataMessage.from_frames(*frames)
"""
message = cls(topic=topic, header=header)
message.payload = list(payload)
message = cls(topic=topic, header=header, additional_payload=payload)
return message

def to_frames(self) -> list[bytes]:
Expand Down
91 changes: 62 additions & 29 deletions pyleco/core/internal_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,43 +59,76 @@ def sign_out(self) -> None: ... # pragma: no cover

def send_message(self, message: Message) -> None: ... # pragma: no cover

def read_message(self, conversation_id: Optional[bytes], timeout: Optional[float] = None
) -> Message: ... # pragma: no cover
def read_message(
self, conversation_id: Optional[bytes], timeout: Optional[float] = None
) -> Message: ... # pragma: no cover

def ask_message(self, message: Message, timeout: Optional[float] = None
) -> Message: ... # pragma: no cover
def ask_message(
self, message: Message, timeout: Optional[float] = None
) -> Message: ... # pragma: no cover

def close(self) -> None: ... # pragma: no cover

# Utilities
def send(self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs) -> None:
def send(
self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs,
) -> None:
"""Send a message based on kwargs."""
self.send_message(message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs
))

def ask(self, receiver: Union[bytes, str], conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
timeout: Optional[float] = None,
**kwargs) -> Message:
self.send_message(
message=Message(receiver=receiver, conversation_id=conversation_id, data=data, **kwargs)
)

def ask(
self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Message:
"""Send a message based on kwargs and retrieve the response."""
return self.ask_message(message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs),
timeout=timeout)

def interpret_rpc_response(self, response_message: Message) -> Any:
return self.rpc_generator.get_result_from_response(response_message.payload[0])

def ask_rpc(self, receiver: Union[bytes, str], method: str, timeout: Optional[float] = None,
**kwargs) -> Any:
return self.ask_message(
message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs
),
timeout=timeout,
)

def interpret_rpc_response(
self, response_message: Message, extract_additional_payload: bool = False
) -> Union[Any, tuple[Any, list[bytes]]]:
"""Retrieve the return value of a RPC response and optionally the additional payload."""
result = self.rpc_generator.get_result_from_response(response_message.payload[0])
if extract_additional_payload:
return result, response_message.payload[1:]
else:
return result

def ask_rpc(
self,
receiver: Union[bytes, str],
method: str,
timeout: Optional[float] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Send a JSON-RPC request (with method \\**kwargs) and return the response value."""
string = self.rpc_generator.build_request_str(method=method, **kwargs)
response = self.ask(receiver=receiver, data=string, message_type=MessageTypes.JSON,
timeout=timeout)
return self.interpret_rpc_response(response)
response = self.ask(
receiver=receiver,
data=string,
message_type=MessageTypes.JSON,
additional_payload=additional_payload,
timeout=timeout,
)
return self.interpret_rpc_response(
response, extract_additional_payload=extract_additional_payload
)


class SubscriberProtocol(Protocol):
Expand Down
2 changes: 1 addition & 1 deletion pyleco/core/leco_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def shut_down(self) -> None: ...


class CoordinatorProtocol(ComponentProtocol, Protocol):
"""A command protocol Coordinator"""
"""A command protocol Coordinator."""

def sign_in(self) -> None: ...

Expand Down
8 changes: 5 additions & 3 deletions pyleco/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union


from . import VERSION_B
Expand Down Expand Up @@ -64,6 +64,7 @@ def __init__(self,
conversation_id: Optional[bytes] = None,
message_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
additional_payload: Optional[Iterable[bytes]] = None,
) -> None:
self.receiver = receiver.encode() if isinstance(receiver, str) else receiver
self.sender = sender.encode() if isinstance(sender, str) else sender
Expand All @@ -81,6 +82,8 @@ def __init__(self,
self.payload = []
else:
self.payload = [serialize_data(data)]
if additional_payload is not None:
self.payload.extend(additional_payload)

@classmethod
def from_frames(cls, version: bytes, receiver: bytes, sender: bytes, header: bytes,
Expand All @@ -92,9 +95,8 @@ def from_frames(cls, version: bytes, receiver: bytes, sender: bytes, header: byt
frames = socket.recv_multipart()
message = Message.from_frames(*frames)
"""
inst = cls(receiver, sender, header=header)
inst = cls(receiver, sender, header=header, additional_payload=payload)
inst.version = version
inst.payload = list(payload)
return inst

def to_frames(self) -> list[bytes]:
Expand Down
63 changes: 50 additions & 13 deletions pyleco/directors/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
import logging
from typing import Any, Optional, Sequence, Union
from typing import Any, Iterable, Optional, Sequence, Union

from ..core.internal_protocols import CommunicatorProtocol
from ..utils.communicator import Communicator
Expand Down Expand Up @@ -88,10 +88,9 @@ def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
# Message handling
def ask_message(self, actor: Optional[Union[bytes, str]] = None,
data: Optional[Any] = None, **kwargs) -> Message:
cid0 = generate_conversation_id()
actor = self._actor_check(actor)
log.debug(f"Asking {actor!r} with message '{data}'.")
response = self.communicator.ask(actor, conversation_id=cid0, data=data, **kwargs)
response = self.communicator.ask(actor, data=data, **kwargs)
log.debug(f"Data '{response.data}' received.")
return response

Expand All @@ -113,10 +112,23 @@ def _prepare_call_action_params(self, args: tuple[Any, ...],
return params

# Remote control synced
def ask_rpc(self, method: str, actor: Optional[Union[bytes, str]] = None, **kwargs) -> Any:
def ask_rpc(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Remotely call the `method` procedure on the `actor` and return the return value."""
receiver = self._actor_check(actor)
return self.communicator.ask_rpc(receiver=receiver, method=method, **kwargs)
return self.communicator.ask_rpc(
receiver=receiver,
method=method,
additional_payload=additional_payload,
extract_additional_payload=extract_additional_payload,
**kwargs,
)

# Component
def get_rpc_capabilities(self, actor: Optional[Union[bytes, str]] = None) -> dict:
Expand Down Expand Up @@ -165,23 +177,48 @@ def call_action(self, action: str, *args, actor: Optional[Union[bytes, str]] = N
return self.ask_rpc("call_action", action=action, actor=actor, **params)

# Async methods: Just send, read later.
def send(self, actor: Optional[Union[bytes, str]] = None, data=None, **kwargs) -> bytes:
def send(
self,
actor: Optional[Union[bytes, str]] = None,
data=None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
"""Send a request and return the conversation_id."""
actor = self._actor_check(actor)
cid0 = generate_conversation_id()
self.communicator.send(actor, conversation_id=cid0, data=data, **kwargs)
self.communicator.send(
actor, conversation_id=cid0, data=data, additional_payload=additional_payload, **kwargs
)
return cid0

def ask_rpc_async(self, method: str, actor: Optional[Union[bytes, str]] = None,
**kwargs) -> bytes:
def ask_rpc_async(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
"""Send a rpc request, the response can be read later with :meth:`read_rpc_response`."""
string = self.generator.build_request_str(method=method, **kwargs)
return self.send(actor=actor, data=string, message_type=MessageTypes.JSON)

def read_rpc_response(self, conversation_id: Optional[bytes] = None, **kwargs) -> Any:
return self.send(
actor=actor,
data=string,
message_type=MessageTypes.JSON,
additional_payload=additional_payload,
)

def read_rpc_response(
self,
conversation_id: Optional[bytes] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Read the response value corresponding to a request with a certain `conversation_id`."""
response_message = self.communicator.read_message(conversation_id=conversation_id, **kwargs)
return self.communicator.interpret_rpc_response(response_message=response_message)
return self.communicator.interpret_rpc_response(
response_message=response_message, extract_additional_payload=extract_additional_payload
)

# Actor
def get_parameters_async(self, parameters: Union[str, Sequence[str]],
Expand Down
2 changes: 1 addition & 1 deletion pyleco/management/test_tasks/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyleco.actors.actor import Actor


class FakeInstrument:
class FakeInstrument: # pragma: no cover
_prop1 = 5

def __init__(self):
Expand Down
20 changes: 16 additions & 4 deletions pyleco/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#

from __future__ import annotations
from typing import Any, Optional, Sequence, Union
from typing import Any, Iterable, Optional, Sequence, Union

from .core.message import Message
from .core.internal_protocols import CommunicatorProtocol
Expand Down Expand Up @@ -219,14 +219,26 @@ def __init__(self, remote_class, **kwargs):
super().__init__(**kwargs)
self.remote_class = remote_class

def ask_rpc(self, method: str, actor: Optional[Union[bytes, str]] = None, **kwargs) -> Any:
def ask_rpc(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
assert hasattr(self.remote_class, method), f"Remote class does not have method '{method}'."
self.method = method
self.kwargs = kwargs
return self.return_value

def ask_rpc_async(self, method: str, actor: Optional[Union[bytes, str]] = None,
**kwargs) -> bytes:
def ask_rpc_async(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
assert hasattr(self.remote_class, method), f"Remote class does not have method '{method}'."
self.method = method
self.kwargs = kwargs
Expand Down
Loading
Loading