Skip to content

Commit

Permalink
Add binary sending to data publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
BenediktBurger committed Jun 11, 2024
1 parent 39ca4c5 commit 8e3d82e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
43 changes: 25 additions & 18 deletions pyleco/utils/data_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from __future__ import annotations
import logging
import pickle
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union

import zmq

Expand All @@ -49,13 +49,15 @@ class DataPublisher:

full_name: str

def __init__(self,
full_name: str,
host: str = "localhost",
port: int = PROXY_RECEIVING_PORT,
log: Optional[logging.Logger] = None,
context: Optional[zmq.Context] = None,
**kwargs) -> None:
def __init__(
self,
full_name: str,
host: str = "localhost",
port: int = PROXY_RECEIVING_PORT,
log: Optional[logging.Logger] = None,
context: Optional[zmq.Context] = None,
**kwargs,
) -> None:
if log is None:
self.log = logging.getLogger(f"{__name__}.Publisher")
else:
Expand Down Expand Up @@ -87,17 +89,22 @@ def send_message(self, message: DataMessage) -> None:
"""Send a data protocol message."""
self.socket.send_multipart(message.to_frames())

def send_data(self, data: Any,
topic: Optional[Union[bytes, str]] = None,
conversation_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
) -> None:
def send_data(
self,
data: Any,
topic: Optional[Union[bytes, str]] = None,
conversation_id: Optional[bytes] = None,
message_type: Union[MessageTypes, int] = MessageTypes.NOT_DEFINED,
additional_payload: Optional[Iterable[bytes]] = None,
) -> None:
"""Send the `data` via the data protocol."""
message = DataMessage(topic=topic or self.full_name,
data=data,
conversation_id=conversation_id,
message_type=message_type
)
message = DataMessage(
topic=topic or self.full_name,
data=data,
conversation_id=conversation_id,
message_type=message_type,
additional_payload=additional_payload,
)
self.send_message(message)

def send_legacy(self, data: dict[str, Any]) -> None:
Expand Down
7 changes: 7 additions & 0 deletions tests/utils/test_data_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def test_call_publisher_sends(publisher: DataPublisher):
assert message.payload[0] == b"data"


def test_send_data(publisher: DataPublisher):
publisher.send_data(
data=b"data", topic=b"topic", conversation_id=b"cid", additional_payload=[b"1"]
)
assert publisher.socket._s == [[b"topic", b"cid\x00", b"data", b"1"]]


def test_send_message(publisher: DataPublisher):
message = DataMessage.from_frames(b"topic", b"header", b"data")
publisher.send_message(message=message)
Expand Down

0 comments on commit 8e3d82e

Please sign in to comment.