Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework rpc server #102

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pyleco/json_utils/json_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
from typing import Any, Optional, Union

from typing import Any, List, Optional, TypeVar, Union

ErrorType = Union["DataError", "Error"]
NotificationType = Union["Notification", "ParamsNotification"]
Expand Down Expand Up @@ -130,3 +129,30 @@ def model_dump(self) -> dict[str, Any]:
pre_dict = asdict(self)
pre_dict["error"] = asdict(self.error)
return pre_dict

"""
Batch Handling.
Not included in jsonrpc2-objects, but defined by JSONRPC 2.0
"""
BatchType = TypeVar("BatchType", RequestType, ResponseType)


class BatchObject(List[BatchType]):
"""A batch of JSONRPC message objects.
It works like a list of appropriate message objects and offers the possibility to dump
this batch object to a plain python object or to JSON.
"""
# Parent class is typing.List, as Python<3.9 does not like list[BatchType]
# Not defined by jsonrpc2-objects

def model_dump(self) -> list[dict[str, Any]]:
return [obj.model_dump() for obj in self]

def model_dump_json(self) -> str:
return json.dumps(self.model_dump(), separators=(",", ":"))


RequestBatch = BatchObject[RequestType]
ResponseBatch = BatchObject[ResponseType]
58 changes: 27 additions & 31 deletions pyleco/json_utils/rpc_server_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from typing import Any, Callable, Optional, Union

from .errors import INTERNAL_ERROR, SERVER_ERROR, INVALID_REQUEST
from .json_objects import ResultResponse, ErrorResponse, DataError
from .json_objects import ResultResponse, ErrorResponse, DataError, ResponseType, ResponseBatch


log = logging.getLogger(__name__)
Expand All @@ -50,15 +50,8 @@ def __init__(
self.method(name="rpc.discover")(self.discover)

def method(self, name: Optional[str] = None, **kwargs) -> Callable[[Callable], None]:
if name is None:

def method_registrar(method: Callable) -> None:
return self._register_method(name=method.__name__, method=method)
else:

def method_registrar(method: Callable) -> None:
return self._register_method(name=name, method=method)

def method_registrar(method: Callable) -> None:
return self._register_method(name=name or method.__name__, method=method)
return method_registrar

def _register_method(self, name: str, method: Callable) -> None:
Expand All @@ -67,31 +60,34 @@ def _register_method(self, name: str, method: Callable) -> None:
def process_request(self, data: Union[bytes, str]) -> Optional[str]:
try:
json_data = json.loads(data)
if isinstance(json_data, list):
results = []
for element in json_data:
result = self._process_single_request(element)
if result is not None:
results.append(result.model_dump())
if results:
return json.dumps(results, separators=(",", ":"))
else:
return None
elif isinstance(json_data, dict):
result = self._process_single_request(json_data)
if result:
return result.model_dump_json()
else:
return None
else:
return ErrorResponse(
id=None,
error=DataError.from_error(INVALID_REQUEST, json_data),
).model_dump_json()
result = self.process_request_object(json_data=json_data)
return result.model_dump_json() if result else None
except Exception as exc:
log.exception(f"{type(exc).__name__}:", exc_info=exc)
return ErrorResponse(id=None, error=INTERNAL_ERROR).model_dump_json()

def process_request_object(
self, json_data: object
) -> Optional[Union[ResponseType, ResponseBatch]]:
result: Optional[Union[ResponseType, ResponseBatch]]
if isinstance(json_data, list):
result = ResponseBatch()
for element in json_data:
result_element = self._process_single_request(element)
if result_element is not None:
result.append(result_element)
elif isinstance(json_data, dict):
result = self._process_single_request(json_data)
else:
result = ErrorResponse(
id=None,
error=DataError.from_error(INVALID_REQUEST, json_data),
)
if result:
return result
else:
return None

def _process_single_request(
self, request: dict[str, Any]
) -> Union[ResultResponse, ErrorResponse, None]:
Expand Down
38 changes: 38 additions & 0 deletions tests/json_utils/test_json_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# THE SOFTWARE.
#

import pytest

from pyleco.json_utils import json_objects


Expand Down Expand Up @@ -83,3 +85,39 @@ def test_generate_data_error_from_error():
assert data_error.code == error.code
assert data_error.message == error.message
assert data_error.data == "data"


class Test_BatchObject:
element = json_objects.Request(5, "start")

@pytest.fixture
def batch_obj(self):
return json_objects.BatchObject([self.element])

def test_init_with_value(self):
obj = json_objects.BatchObject([self.element])
assert obj == [self.element]

def test_init_with_values(self):
obj = json_objects.BatchObject([self.element, self.element])
assert obj == [self.element, self.element]

def test_bool_value_with_element(self):
obj = json_objects.BatchObject([self.element])
assert bool(obj) is True

def test_bool_value_without_element(self):
obj = json_objects.BatchObject()
assert bool(obj) is False

def test_append(self, batch_obj: json_objects.BatchObject):
el2 = json_objects.Request(5, "start")
batch_obj.append(el2)
assert batch_obj[-1] == el2

def test_model_dump(self, batch_obj: json_objects.BatchObject):
assert batch_obj.model_dump() == [self.element.model_dump()]

def test_model_dump_json(self, batch_obj: json_objects.BatchObject):
result = '[{"id":5,"method":"start","jsonrpc":"2.0"}]'
assert batch_obj.model_dump_json() == result
136 changes: 129 additions & 7 deletions tests/json_utils/test_rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,27 @@
#

import json
import logging

import pytest

from pyleco.json_utils.rpc_generator import RPCGenerator
from pyleco.json_utils.rpc_server_definition import RPCServer
from pyleco.json_utils.json_objects import Request, ParamsRequest, ResultResponse
from pyleco.json_utils.errors import ServerError, InvalidRequest, INVALID_REQUEST, SERVER_ERROR
from pyleco.json_utils.json_objects import (
Request,
ParamsRequest,
ResultResponse,
ErrorResponse,
DataError,
ResponseBatch,
)
from pyleco.json_utils.errors import (
ServerError,
InvalidRequest,
INVALID_REQUEST,
SERVER_ERROR,
INTERNAL_ERROR,
)

try:
# Load openrpc server for comparison, if available.
Expand All @@ -45,13 +59,14 @@ def rpc_generator() -> RPCGenerator:
return RPCGenerator()


def side_effect_method(arg=None):
args = None
def side_effect_method(arg=None) -> int:
global args
args = (arg,)
return 5


def fail():
def fail() -> None:
"""Fail always.
This method fails always.
Expand All @@ -60,13 +75,13 @@ def fail():
raise NotImplementedError


def simple():
def simple() -> int:
"""A method without parameters."""
return 7


def obligatory_parameter(arg1: float):
"""Needs an argument"""
def obligatory_parameter(arg1: float) -> float:
"""Needs an argument."""
return arg1 * 2


Expand All @@ -81,6 +96,18 @@ def rpc_server(request) -> RPCServer:
return rpc_server


@pytest.fixture
def rpc_server_local() -> RPCServer:
"""Create an instance of PyLECO's RPC Server"""
rpc_server = RPCServer()
rpc_server.method(name="sem")(side_effect_method)
rpc_server.method()(side_effect_method)
rpc_server.method()(fail)
rpc_server.method()(simple)
rpc_server.method()(obligatory_parameter)
return rpc_server


def test_success(rpc_generator: RPCGenerator, rpc_server: RPCServer):
request = rpc_generator.build_request_str(method="sem", arg=3)
response = rpc_server.process_request(request)
Expand Down Expand Up @@ -181,3 +208,98 @@ def test_rpc_discover_not_listed(self, methods: list):
for m in methods:
if m.get("name") == "rpc.discover":
raise AssertionError("rpc.discover is listed as a method!")


# tests regarding the local implementation of the RPC Server
def test_process_single_notification(rpc_server_local: RPCServer):
result = rpc_server_local._process_single_request(
{"jsonrpc": "2.0", "method": "simple"}
)
assert result is None


class Test_process_request:
def test_log_exception(self, rpc_server_local: RPCServer, caplog: pytest.LogCaptureFixture):
rpc_server_local.process_request(b"\xff")
records = caplog.record_tuples
assert records[-1] == (
"pyleco.json_utils.rpc_server_definition",
logging.ERROR,
"UnicodeDecodeError:",
)

def test_exception_response(self, rpc_server_local: RPCServer):
result = rpc_server_local.process_request(b"\xff")
assert result == ErrorResponse(id=None, error=INTERNAL_ERROR).model_dump_json()

def test_invalid_request(self, rpc_server_local: RPCServer):
result = rpc_server_local.process_request(b"7")
assert (
result
== ErrorResponse(
id=None, error=DataError.from_error(INVALID_REQUEST, 7)
).model_dump_json()
)

def test_batch_entry_notification(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
{"jsonrpc": "2.0", "method": "simple", "id": 4},
]
result = json.loads(rpc_server_local.process_request(json.dumps(requests))) # type: ignore
assert result == [{"jsonrpc": "2.0", "result": 7, "id": 4}]

def test_batch_of_notifications(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
{"jsonrpc": "2.0", "method": "simple"},
]
result = rpc_server_local.process_request(json.dumps(requests))
assert result is None

def test_notification(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
]
result = rpc_server_local.process_request(json.dumps(requests))
assert result is None


class Test_process_request_object:
def test_invalid_request(self, rpc_server_local: RPCServer):
result = rpc_server_local.process_request_object(7)
assert (
result
== ErrorResponse(
id=None, error=DataError.from_error(INVALID_REQUEST, 7)
)
)

def test_batch_entry_notification(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
{"jsonrpc": "2.0", "method": "simple", "id": 4},
]
result = rpc_server_local.process_request_object(requests)
assert result == ResponseBatch([ResultResponse(4, 7)])

def test_batch_of_notifications(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
{"jsonrpc": "2.0", "method": "simple"},
]
result = rpc_server_local.process_request_object(requests)
assert result is None

def test_notification(self, rpc_server_local: RPCServer):
"""A notification (request without id) shall not return anything."""
requests = [
{"jsonrpc": "2.0", "method": "simple"},
]
result = rpc_server_local.process_request_object(requests)
assert result is None