Skip to content

Commit

Permalink
Merge pull request #58 from pymeasure/fix-coordinator
Browse files Browse the repository at this point in the history
Fix coordinator
  • Loading branch information
BenediktBurger authored Feb 14, 2024
2 parents b981cd7 + 7eb55a5 commit ef56a34
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 41 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# CHANGELOG

## [0.2.2] - 2024-02-14

### Fixed

- Fix Communicator to distinguish correctly different json rpc messages ([#57](https://github.com/pymeasure/pyleco/issues/57))
- Fix MessageHandler not distinguish correctly batch requests ([#56](https://github.com/pymeasure/pyleco/issues/56))
- Bump setup-python action version to v5

**Full Changelog**: https://github.com/pymeasure/pyleco/compare/v0.2.1...v.0.2.2


## [0.2.1] - 2024-02-13

### Fixed
Expand Down Expand Up @@ -73,7 +84,8 @@ _Initial alpha version, complies with [LECO protocol alpha-0.0.1](https://github
@BenediktBurger, @bilderbuchi, @bklebel


[unreleased]: https://github.com/pymeasure/pyleco/compare/v0.2.1...HEAD
[unreleased]: https://github.com/pymeasure/pyleco/compare/v0.2.2...HEAD
[0.2.2]: https://github.com/pymeasure/pyleco/releases/tag/v0.2.2
[0.2.1]: https://github.com/pymeasure/pyleco/releases/tag/v0.2.1
[0.2.0]: https://github.com/pymeasure/pyleco/releases/tag/v0.2.0
[0.1.0]: https://github.com/pymeasure/pyleco/releases/tag/v0.1.0
Expand Down
53 changes: 34 additions & 19 deletions pyleco/coordinators/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from json import JSONDecodeError
import logging
from socket import gethostname
from typing import Optional, Union
from typing import Any, Optional, Union

from jsonrpcobjects.objects import ErrorResponse, Request, ParamsRequest
from openrpc import RPCServer
Expand All @@ -35,6 +35,7 @@
from ..core import COORDINATOR_PORT
from ..utils.coordinator_utils import Directory, ZmqNode, ZmqMultiSocket, MultiSocket
from ..core.message import Message, MessageTypes
from ..core.serialization import get_json_content_type, JsonContentTypes
from ..errors import CommunicationError
from ..errors import NODE_UNKNOWN, RECEIVER_UNKNOWN, generate_error_with_data
from ..utils.timers import RepeatingTimer
Expand All @@ -45,6 +46,7 @@
from pyleco.core import COORDINATOR_PORT
from pyleco.utils.coordinator_utils import Directory, ZmqNode, ZmqMultiSocket, MultiSocket
from pyleco.core.message import Message, MessageTypes
from pyleco.core.serialization import get_json_content_type, JsonContentTypes
from pyleco.errors import CommunicationError
from pyleco.errors import NODE_UNKNOWN, RECEIVER_UNKNOWN, generate_error_with_data
from pyleco.utils.timers import RepeatingTimer
Expand Down Expand Up @@ -318,33 +320,46 @@ def handle_commands(self, sender_identity: bytes, message: Message) -> None:
self.current_message = message
self.current_identity = sender_identity
if message.header_elements.message_type == MessageTypes.JSON:
self.handle_json_commands(message=message)
else:
log.error(
f"Message from {message.sender!r} of unknown type received: {message.payload[0]!r}")

def handle_json_commands(self, message: Message) -> None:
try:
data: Union[list[dict[str, Any]], dict[str, Any]] = message.data # type: ignore
except JSONDecodeError:
log.error(
f"Invalid JSON message from {message.sender!r} received: {message.payload[0]!r}")
return
json_type = get_json_content_type(data)
if JsonContentTypes.REQUEST in json_type:
try:
data = message.data
except JSONDecodeError:
log.error(f"Invalid JSON message from {message.sender!r} received: {message.payload[0]!r}") # noqa
return
if isinstance(data, dict):
if error := data.get("error"):
log.error(f"Error from {message.sender!r} received: {error}.")
return
elif data.get("result", False) is None:
return # acknowledgement == heartbeat
try:
self.handle_rpc_call(sender_identity=sender_identity, message=message)
self.handle_rpc_call(message=message)
except Exception as exc:
log.exception(f"Invalid JSON-RPC message from {message.sender!r} received: {data}",
exc_info=exc)
log.exception(
f"Invalid JSON-RPC message from {message.sender!r} received: {data}",
exc_info=exc)
elif JsonContentTypes.RESULT_RESPONSE == json_type:
if data.get("result", False) is not None: # type: ignore
log.info(f"Unexpeced result received: {data}")
elif JsonContentTypes.ERROR in json_type:
log.error(f"Error from {message.sender!r} received: {data}.")
elif JsonContentTypes.RESULT in json_type:
for element in data:
if element.get("result", False) is not None: # type: ignore
log.info(f"Unexpeced result received: {data}")
else:
log.error(
f"Message from {message.sender!r} of unknown type received: {message.payload[0]!r}")
f"Invalid JSON RPC message from {message.sender!r} received: {message.payload[0]!r}") # noqa

def handle_rpc_call(self, sender_identity: bytes, message: Message) -> None:
def handle_rpc_call(self, message: Message) -> None:
reply = self.rpc.process_request(message.payload[0])
sender_namespace = message.sender_elements.namespace
log.debug(f"Reply '{reply!r}' to {message.sender!r} at node {sender_namespace!r}.")
log.debug(f"Reply {reply!r} to {message.sender!r} at node {sender_namespace!r}.")
if sender_namespace == self.namespace or sender_namespace == b"":
self.send_main_sock_reply(
sender_identity=sender_identity,
sender_identity=self.current_identity,
original_message=message,
data=reply,
message_type=MessageTypes.JSON,
Expand Down
39 changes: 38 additions & 1 deletion pyleco/core/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# THE SOFTWARE.
#

from enum import IntEnum
from enum import IntEnum, IntFlag
import json
from typing import Any, Optional, NamedTuple, Union

Expand Down Expand Up @@ -68,6 +68,18 @@ class MessageTypes(IntEnum):
JSON = 1


class JsonContentTypes(IntFlag):
"""Type of the JSON content."""
INVALID = 0
REQUEST = 1
RESPONSE = 2
RESULT = 4
ERROR = 8
BATCH = 16
RESULT_RESPONSE = RESPONSE + RESULT
ERROR_RESPONSE = RESPONSE + ERROR


def create_header_frame(conversation_id: Optional[bytes] = None,
message_id: Optional[Union[bytes, int]] = 0,
message_type: Union[bytes, int, MessageTypes] = MessageTypes.NOT_DEFINED,
Expand Down Expand Up @@ -139,3 +151,28 @@ def deserialize_data(content: bytes) -> Any:
def generate_conversation_id() -> bytes:
"""Generate a conversation_id."""
return uuid7(as_type="bytes") # type: ignore


def _get_json_object_type(data: dict[str, Any]) -> JsonContentTypes:
if isinstance(data, dict):
if "method" in data.keys():
return JsonContentTypes.REQUEST
elif "result" in data.keys():
return JsonContentTypes.RESULT_RESPONSE
elif "error" in data.keys():
return JsonContentTypes.ERROR_RESPONSE
return JsonContentTypes.INVALID


def get_json_content_type(data: Any) -> JsonContentTypes:
if isinstance(data, list):
content = JsonContentTypes.BATCH if data else JsonContentTypes.INVALID
for element in data:
element_typ = _get_json_object_type(element)
if element_typ == JsonContentTypes.INVALID:
return JsonContentTypes.INVALID
else:
content |= element_typ
return content
else:
return _get_json_object_type(data)
12 changes: 6 additions & 6 deletions pyleco/utils/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from ..core.leco_protocols import ExtendedComponentProtocol
from ..core.message import Message, MessageTypes
from ..core.rpc_generator import RPCGenerator
from ..core.serialization import JsonContentTypes, get_json_content_type
from .base_communicator import BaseCommunicator
from .log_levels import PythonLogLevels
from .zmq_log_handler import ZmqLogHandler
Expand Down Expand Up @@ -223,16 +224,15 @@ def handle_message(self, message: Message) -> None:
def handle_json_message(self, message: Message) -> None:
try:
data: dict[str, Any] = message.data # type: ignore
keys = data.keys()
except (JSONDecodeError, AttributeError) as exc:
except (JSONDecodeError) as exc:
self.log.exception(f"Could not decode json message {message}", exc_info=exc)
return
if "method" in keys:
content = get_json_content_type(data)
if JsonContentTypes.REQUEST in content:
self.handle_json_request(message=message)
return
elif "error" in keys:
elif JsonContentTypes.ERROR in content:
self.handle_json_error(message=message)
elif "result" in keys:
elif JsonContentTypes.RESULT in content:
self.handle_json_result(message)
else:
self.log.error(f"Invalid JSON message received: {message}")
Expand Down
39 changes: 36 additions & 3 deletions tests/coordinators/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def test_remote_heartbeat(coordinator: Coordinator, fake_counting, sender):

class Test_handle_commands:
class SpecialCoordinator(Coordinator):
def handle_rpc_call(self, sender_identity: bytes, message: Message) -> None:
self._rpc = sender_identity, message
def handle_rpc_call(self, message: Message) -> None:
self._rpc = message

@pytest.fixture
def coordinator_hc(self) -> Coordinator:
Expand All @@ -327,7 +327,7 @@ def test_store_identity(self, coordinator_hc: Coordinator):
))
def test_call_handle_rpc_call(self, coordinator_hc: Coordinator, identity, message):
coordinator_hc.handle_commands(identity, message)
assert coordinator_hc._rpc == (identity, message) # type: ignore
assert coordinator_hc._rpc == message # type: ignore

def test_log_error_response(self, coordinator_hc: Coordinator):
pass # TODO
Expand All @@ -340,6 +340,39 @@ def test_pass_at_null_result(self, coordinator_hc: Coordinator):
assert not hasattr(coordinator_hc, "_rpc")
# assert no error log entry. TODO

def test_log_at_non_null_result(self, coordinator_hc: Coordinator,
caplog: pytest.LogCaptureFixture):
caplog.set_level(10)
coordinator_hc.handle_commands(b"",
Message(b"",
message_type=MessageTypes.JSON,
data={"jsonrpc": "2.0", "result": 5}))
assert not hasattr(coordinator_hc, "_rpc")
# assert no error log entry. TODO
caplog.records[-1].msg.startswith("Unexpected result")

def test_pass_at_batch_of_null_results(self, coordinator_hc: Coordinator):
coordinator_hc.handle_commands(b"",
Message(b"",
message_type=MessageTypes.JSON,
data=[{"jsonrpc": "2.0", "result": None, "id": 1},
{"jsonrpc": "2.0", "result": None, "id": 2}]
))
assert not hasattr(coordinator_hc, "_rpc")
# assert no error log entry. TODO

def test_log_at_batch_of_non_null_results(self, coordinator_hc: Coordinator,
caplog: pytest.LogCaptureFixture):
caplog.set_level(10)
coordinator_hc.handle_commands(b"",
Message(b"",
message_type=MessageTypes.JSON,
data=[{"jsonrpc": "2.0", "result": None, "id": 1},
{"jsonrpc": "2.0", "result": 5, "id": 2}]
))
assert not hasattr(coordinator_hc, "_rpc")
caplog.records[-1].msg.startswith("Unexpected result")

@pytest.mark.parametrize("data", (
{"jsonrpc": "2.0", "no method": 7},
["jsonrpc", "2.0", "no method", 7], # not a dict
Expand Down
55 changes: 55 additions & 0 deletions tests/core/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
# THE SOFTWARE.
#

from typing import Any, Optional, Union

import pytest
from jsonrpcobjects.objects import Request

from pyleco.core import serialization
from pyleco.core.serialization import JsonContentTypes, get_json_content_type


class Test_create_header_frame:
Expand Down Expand Up @@ -103,3 +106,55 @@ def test_UUID_version(self, conversation_id):

def test_variant(self, conversation_id):
assert conversation_id[8] >> 6 == 0b10


def test_json_type_result_is_response():
assert JsonContentTypes.RESPONSE in JsonContentTypes.RESULT_RESPONSE
assert JsonContentTypes.RESULT in JsonContentTypes.RESULT_RESPONSE


def test_json_type_error_is_response():
assert JsonContentTypes.RESPONSE in JsonContentTypes.ERROR_RESPONSE
assert JsonContentTypes.ERROR in JsonContentTypes.ERROR_RESPONSE


# Methods for get_json_content_type
def create_request(method: str, params: Optional[Union[list, dict]] = None, id: int = 1
) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": id, "method": method, "params": params}


def create_result(result: Any, id: int = 1) -> dict[str, Any]:
return {"jsonrpc": "2.0", "result": result, "id": id}


def create_error(error_code: int, error_message: str, id: int = 1) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": id, "error": {"code": error_code, "message": error_message}}


class Test_get_json_content_type:
@pytest.mark.parametrize("data, type", (
(create_request("abc"), JsonContentTypes.REQUEST),
([create_request(method="abc")] * 2, JsonContentTypes.REQUEST | JsonContentTypes.BATCH),
(create_result(None), JsonContentTypes.RESULT_RESPONSE),
([create_result(None), create_result(5, 7)],
JsonContentTypes.RESULT_RESPONSE | JsonContentTypes.BATCH),
(create_error(89, "whatever"), JsonContentTypes.ERROR_RESPONSE),
([create_error(89, "xy")] * 2,
JsonContentTypes.ERROR_RESPONSE | JsonContentTypes.BATCH),
([create_result(4), create_error(32, "xy")], # batch of result and error
JsonContentTypes.RESULT_RESPONSE | JsonContentTypes.BATCH | JsonContentTypes.ERROR),
))
def test_data_is_valid_type(self, data, type):
assert get_json_content_type(data) == type

@pytest.mark.parametrize("data", (
{},
[],
[{}],
{"some": "thing"},
5.6,
"adsfasdf",
))
def test_invalid_data(self, data):
assert get_json_content_type(data) == JsonContentTypes.INVALID
30 changes: 19 additions & 11 deletions tests/utils/test_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ def test_read_and_handle_message(self, handler: MessageHandler,
def test_handle_not_signed_in_message(self, handler: MessageHandler):
handler.sign_in = MagicMock() # type: ignore
handler.socket._r = [Message(receiver="handler", sender="N1.COORDINATOR", # type: ignore
message_type=MessageTypes.JSON,
data=ErrorResponse(id=5, error=NOT_SIGNED_IN),
).to_frames()]
message_type=MessageTypes.JSON,
data=ErrorResponse(id=5, error=NOT_SIGNED_IN),
).to_frames()]
handler.read_and_handle_message()
assert handler.namespace is None
handler.sign_in.assert_called_once()
Expand All @@ -423,30 +423,38 @@ def test_handle_receiver_unknown_message(self, handler: MessageHandler):
def test_handle_ACK_does_not_change_Namespace(self, handler: MessageHandler):
"""Test that an ACK does not change the Namespace, if it is already set."""
handler.socket._r = [Message(b"N3.handler", b"N3.COORDINATOR", # type: ignore
message_type=MessageTypes.JSON,
data={"id": 3, "result": None, "jsonrpc": "2.0"}).to_frames()]
message_type=MessageTypes.JSON,
data={"id": 3, "result": None, "jsonrpc": "2.0"}).to_frames()]
handler.namespace = "N1"
handler.read_and_handle_message()
assert handler.namespace == "N1"

def test_handle_invalid_json_message(self, handler: MessageHandler,
caplog: pytest.LogCaptureFixture):
caplog: pytest.LogCaptureFixture):
"""An invalid message should not cause the message handler to crash."""
handler.socket._r = [Message(b"N3.handler", b"N3.COORDINATOR", # type: ignore
message_type=MessageTypes.JSON,
data={"without": "method..."}).to_frames()]
message_type=MessageTypes.JSON,
data={"without": "method..."}).to_frames()]
handler.read_and_handle_message()
assert caplog.records[-1].msg.startswith("Invalid JSON message")

def test_handle_corrupted_message(self, handler: MessageHandler,
caplog: pytest.LogCaptureFixture):
"""An invalid message should not cause the message handler to crash."""
handler.socket._r = [Message(b"N3.handler", b"N3.COORDINATOR", # type: ignore
message_type=MessageTypes.JSON,
data=[]).to_frames()]
message_type=MessageTypes.JSON,
data=[]).to_frames()]
handler.read_and_handle_message()
assert caplog.records[-1].msg.startswith("Could not decode")
assert caplog.records[-1].msg.startswith("Invalid JSON message")

def test_handle_undecodable_message(self, handler: MessageHandler,
caplog: pytest.LogCaptureFixture):
"""An invalid message should not cause the message handler to crash."""
message = Message(b"N3.handler", b"N3.COORDINATOR", message_type=MessageTypes.JSON)
message.payload = [b"()"]
handler.socket._r = [message.to_frames()] # type: ignore
handler.read_and_handle_message()
assert caplog.records[-1].msg.startswith("Could not decode")


def test_handle_unknown_message_type(handler: MessageHandler, caplog: pytest.LogCaptureFixture):
Expand Down

0 comments on commit ef56a34

Please sign in to comment.