Skip to content

Commit

Permalink
Merge pull request #82 from pymeasure/binary_payload
Browse files Browse the repository at this point in the history
Add binary payload handling to PyLECO
  • Loading branch information
BenediktBurger authored Jun 19, 2024
2 parents 30f9a97 + 6ba4c19 commit ca94ef5
Show file tree
Hide file tree
Showing 18 changed files with 598 additions and 78 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG

## [unreleased]

### Added

* Add convenience functions for using additional frames for binary payload ([#82](https://github.com/pymeasure/pyleco/pull/82))


## [0.3.2] 2024-5-07

### Fixed
Expand Down
8 changes: 5 additions & 3 deletions pyleco/core/data_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union

from .serialization import deserialize_data, generate_conversation_id, serialize_data, MessageTypes

Expand All @@ -42,6 +42,7 @@ def __init__(self,
data: Optional[Union[bytes, str, Any]] = None,
conversation_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs) -> None:
super().__init__(**kwargs)
self.topic = topic.encode() if isinstance(topic, str) else topic
Expand All @@ -61,6 +62,8 @@ def __init__(self,
self.payload = []
else:
self.payload = [serialize_data(data)]
if additional_payload is not None:
self.payload.extend(additional_payload)

@classmethod
def from_frames(cls, topic: bytes, header: bytes, *payload: bytes):
Expand All @@ -71,8 +74,7 @@ def from_frames(cls, topic: bytes, header: bytes, *payload: bytes):
frames = socket.recv_multipart()
message = DataMessage.from_frames(*frames)
"""
message = cls(topic=topic, header=header)
message.payload = list(payload)
message = cls(topic=topic, header=header, additional_payload=payload)
return message

def to_frames(self) -> list[bytes]:
Expand Down
91 changes: 62 additions & 29 deletions pyleco/core/internal_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,43 +59,76 @@ def sign_out(self) -> None: ... # pragma: no cover

def send_message(self, message: Message) -> None: ... # pragma: no cover

def read_message(self, conversation_id: Optional[bytes], timeout: Optional[float] = None
) -> Message: ... # pragma: no cover
def read_message(
self, conversation_id: Optional[bytes], timeout: Optional[float] = None
) -> Message: ... # pragma: no cover

def ask_message(self, message: Message, timeout: Optional[float] = None
) -> Message: ... # pragma: no cover
def ask_message(
self, message: Message, timeout: Optional[float] = None
) -> Message: ... # pragma: no cover

def close(self) -> None: ... # pragma: no cover

# Utilities
def send(self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs) -> None:
def send(
self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs,
) -> None:
"""Send a message based on kwargs."""
self.send_message(message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs
))

def ask(self, receiver: Union[bytes, str], conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
timeout: Optional[float] = None,
**kwargs) -> Message:
self.send_message(
message=Message(receiver=receiver, conversation_id=conversation_id, data=data, **kwargs)
)

def ask(
self,
receiver: Union[bytes, str],
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Message:
"""Send a message based on kwargs and retrieve the response."""
return self.ask_message(message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs),
timeout=timeout)

def interpret_rpc_response(self, response_message: Message) -> Any:
return self.rpc_generator.get_result_from_response(response_message.payload[0])

def ask_rpc(self, receiver: Union[bytes, str], method: str, timeout: Optional[float] = None,
**kwargs) -> Any:
return self.ask_message(
message=Message(
receiver=receiver, conversation_id=conversation_id, data=data, **kwargs
),
timeout=timeout,
)

def interpret_rpc_response(
self, response_message: Message, extract_additional_payload: bool = False
) -> Union[Any, tuple[Any, list[bytes]]]:
"""Retrieve the return value of a RPC response and optionally the additional payload."""
result = self.rpc_generator.get_result_from_response(response_message.payload[0])
if extract_additional_payload:
return result, response_message.payload[1:]
else:
return result

def ask_rpc(
self,
receiver: Union[bytes, str],
method: str,
timeout: Optional[float] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Send a JSON-RPC request (with method \\**kwargs) and return the response value."""
string = self.rpc_generator.build_request_str(method=method, **kwargs)
response = self.ask(receiver=receiver, data=string, message_type=MessageTypes.JSON,
timeout=timeout)
return self.interpret_rpc_response(response)
response = self.ask(
receiver=receiver,
data=string,
message_type=MessageTypes.JSON,
additional_payload=additional_payload,
timeout=timeout,
)
return self.interpret_rpc_response(
response, extract_additional_payload=extract_additional_payload
)


class SubscriberProtocol(Protocol):
Expand Down
2 changes: 1 addition & 1 deletion pyleco/core/leco_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def shut_down(self) -> None: ...


class CoordinatorProtocol(ComponentProtocol, Protocol):
"""A command protocol Coordinator"""
"""A command protocol Coordinator."""

def sign_in(self) -> None: ...

Expand Down
8 changes: 5 additions & 3 deletions pyleco/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union


from . import VERSION_B
Expand Down Expand Up @@ -64,6 +64,7 @@ def __init__(self,
conversation_id: Optional[bytes] = None,
message_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
additional_payload: Optional[Iterable[bytes]] = None,
) -> None:
self.receiver = receiver.encode() if isinstance(receiver, str) else receiver
self.sender = sender.encode() if isinstance(sender, str) else sender
Expand All @@ -81,6 +82,8 @@ def __init__(self,
self.payload = []
else:
self.payload = [serialize_data(data)]
if additional_payload is not None:
self.payload.extend(additional_payload)

@classmethod
def from_frames(cls, version: bytes, receiver: bytes, sender: bytes, header: bytes,
Expand All @@ -92,9 +95,8 @@ def from_frames(cls, version: bytes, receiver: bytes, sender: bytes, header: byt
frames = socket.recv_multipart()
message = Message.from_frames(*frames)
"""
inst = cls(receiver, sender, header=header)
inst = cls(receiver, sender, header=header, additional_payload=payload)
inst.version = version
inst.payload = list(payload)
return inst

def to_frames(self) -> list[bytes]:
Expand Down
63 changes: 50 additions & 13 deletions pyleco/directors/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from __future__ import annotations
import logging
from typing import Any, Optional, Sequence, Union
from typing import Any, Iterable, Optional, Sequence, Union

from ..core.internal_protocols import CommunicatorProtocol
from ..utils.communicator import Communicator
Expand Down Expand Up @@ -88,10 +88,9 @@ def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
# Message handling
def ask_message(self, actor: Optional[Union[bytes, str]] = None,
data: Optional[Any] = None, **kwargs) -> Message:
cid0 = generate_conversation_id()
actor = self._actor_check(actor)
log.debug(f"Asking {actor!r} with message '{data}'.")
response = self.communicator.ask(actor, conversation_id=cid0, data=data, **kwargs)
response = self.communicator.ask(actor, data=data, **kwargs)
log.debug(f"Data '{response.data}' received.")
return response

Expand All @@ -113,10 +112,23 @@ def _prepare_call_action_params(self, args: tuple[Any, ...],
return params

# Remote control synced
def ask_rpc(self, method: str, actor: Optional[Union[bytes, str]] = None, **kwargs) -> Any:
def ask_rpc(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Remotely call the `method` procedure on the `actor` and return the return value."""
receiver = self._actor_check(actor)
return self.communicator.ask_rpc(receiver=receiver, method=method, **kwargs)
return self.communicator.ask_rpc(
receiver=receiver,
method=method,
additional_payload=additional_payload,
extract_additional_payload=extract_additional_payload,
**kwargs,
)

# Component
def get_rpc_capabilities(self, actor: Optional[Union[bytes, str]] = None) -> dict:
Expand Down Expand Up @@ -165,23 +177,48 @@ def call_action(self, action: str, *args, actor: Optional[Union[bytes, str]] = N
return self.ask_rpc("call_action", action=action, actor=actor, **params)

# Async methods: Just send, read later.
def send(self, actor: Optional[Union[bytes, str]] = None, data=None, **kwargs) -> bytes:
def send(
self,
actor: Optional[Union[bytes, str]] = None,
data=None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
"""Send a request and return the conversation_id."""
actor = self._actor_check(actor)
cid0 = generate_conversation_id()
self.communicator.send(actor, conversation_id=cid0, data=data, **kwargs)
self.communicator.send(
actor, conversation_id=cid0, data=data, additional_payload=additional_payload, **kwargs
)
return cid0

def ask_rpc_async(self, method: str, actor: Optional[Union[bytes, str]] = None,
**kwargs) -> bytes:
def ask_rpc_async(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
"""Send a rpc request, the response can be read later with :meth:`read_rpc_response`."""
string = self.generator.build_request_str(method=method, **kwargs)
return self.send(actor=actor, data=string, message_type=MessageTypes.JSON)

def read_rpc_response(self, conversation_id: Optional[bytes] = None, **kwargs) -> Any:
return self.send(
actor=actor,
data=string,
message_type=MessageTypes.JSON,
additional_payload=additional_payload,
)

def read_rpc_response(
self,
conversation_id: Optional[bytes] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
"""Read the response value corresponding to a request with a certain `conversation_id`."""
response_message = self.communicator.read_message(conversation_id=conversation_id, **kwargs)
return self.communicator.interpret_rpc_response(response_message=response_message)
return self.communicator.interpret_rpc_response(
response_message=response_message, extract_additional_payload=extract_additional_payload
)

# Actor
def get_parameters_async(self, parameters: Union[str, Sequence[str]],
Expand Down
2 changes: 1 addition & 1 deletion pyleco/management/test_tasks/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyleco.actors.actor import Actor


class FakeInstrument:
class FakeInstrument: # pragma: no cover
_prop1 = 5

def __init__(self):
Expand Down
20 changes: 16 additions & 4 deletions pyleco/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#

from __future__ import annotations
from typing import Any, Optional, Sequence, Union
from typing import Any, Iterable, Optional, Sequence, Union

from .core.message import Message
from .core.internal_protocols import CommunicatorProtocol
Expand Down Expand Up @@ -219,14 +219,26 @@ def __init__(self, remote_class, **kwargs):
super().__init__(**kwargs)
self.remote_class = remote_class

def ask_rpc(self, method: str, actor: Optional[Union[bytes, str]] = None, **kwargs) -> Any:
def ask_rpc(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
extract_additional_payload: bool = False,
**kwargs,
) -> Any:
assert hasattr(self.remote_class, method), f"Remote class does not have method '{method}'."
self.method = method
self.kwargs = kwargs
return self.return_value

def ask_rpc_async(self, method: str, actor: Optional[Union[bytes, str]] = None,
**kwargs) -> bytes:
def ask_rpc_async(
self,
method: str,
actor: Optional[Union[bytes, str]] = None,
additional_payload: Optional[Iterable[bytes]] = None,
**kwargs,
) -> bytes:
assert hasattr(self.remote_class, method), f"Remote class does not have method '{method}'."
self.method = method
self.kwargs = kwargs
Expand Down
Loading

0 comments on commit ca94ef5

Please sign in to comment.