diff --git a/micro_benchmarks/conftest.py b/micro_benchmarks/conftest.py index 6cb08303..de5d8d5d 100644 --- a/micro_benchmarks/conftest.py +++ b/micro_benchmarks/conftest.py @@ -2,9 +2,28 @@ import os +import pytest -def pytest_report_header() -> list[str]: +PAYLOAD_SIZE_LEVEL = "payload_size_level" + + +def pytest_addoption(parser: pytest.Parser) -> None: + group = parser.getgroup("benchmark") + group.addoption( + "--payload-size-level", + dest=PAYLOAD_SIZE_LEVEL, + type=lambda x: tuple(map(int, x.split(","))), + default=(1, 10), + help="a list of message size levels to use (in kilobytes)", + ) + + +def pytest_report_header(config: pytest.Config) -> list[str]: + headers: list[str] = [] addopts: str = os.environ.get("PYTEST_ADDOPTS", "") - if not addopts: - return [] - return [f"PYTEST_ADDOPTS: {addopts}"] + if addopts: + headers.append(f"PYTEST_ADDOPTS: {addopts}") + headers.append( + f"Payload size levels: {', '.join(f'{payload_size}kb' for payload_size in config.getoption(PAYLOAD_SIZE_LEVEL))}" + ) + return headers diff --git a/micro_benchmarks/serializers/bench_cbor.py b/micro_benchmarks/serializers/bench_cbor.py index d404b0b1..f7688c9b 100644 --- a/micro_benchmarks/serializers/bench_cbor.py +++ b/micro_benchmarks/serializers/bench_cbor.py @@ -2,16 +2,20 @@ from __future__ import annotations +from collections import deque from typing import TYPE_CHECKING, Any from easynetwork.serializers.cbor import CBORSerializer import pytest +from .groups import SerializerGroup + if TYPE_CHECKING: from pytest_benchmark.fixture import BenchmarkFixture +@pytest.mark.benchmark(group=SerializerGroup.JSON_SERIALIZE) def bench_CBORSerializer_serialize( benchmark: BenchmarkFixture, json_object: Any, @@ -21,6 +25,7 @@ def bench_CBORSerializer_serialize( benchmark(serializer.serialize, json_object) +@pytest.mark.benchmark(group=SerializerGroup.JSON_DESERIALIZE) def bench_CBORSerializer_deserialize( benchmark: BenchmarkFixture, json_object: Any, @@ -33,15 +38,17 @@ def bench_CBORSerializer_deserialize( assert result == json_object +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_SERIALIZE) def bench_CBORSerializer_incremental_serialize( benchmark: BenchmarkFixture, json_object: Any, ) -> None: serializer = CBORSerializer() - benchmark(lambda: b"".join(serializer.incremental_serialize(json_object))) + benchmark(lambda: deque(serializer.incremental_serialize(json_object))) +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_DESERIALIZE) @pytest.mark.parametrize("buffered", [False, True], ids=lambda p: f"buffered=={p}") def bench_CBORSerializer_incremental_deserialize( buffered: bool, @@ -54,11 +61,11 @@ def bench_CBORSerializer_incremental_deserialize( if buffered: nbytes = len(cbor_data) buffer: memoryview = serializer.create_deserializer_buffer(nbytes) + buffer[:nbytes] = cbor_data def deserialize() -> Any: consumer = serializer.buffered_incremental_deserialize(buffer) next(consumer) - buffer[:nbytes] = cbor_data try: consumer.send(nbytes) except StopIteration as exc: diff --git a/micro_benchmarks/serializers/bench_json.py b/micro_benchmarks/serializers/bench_json.py index 7b38a8e3..511e96a7 100644 --- a/micro_benchmarks/serializers/bench_json.py +++ b/micro_benchmarks/serializers/bench_json.py @@ -2,16 +2,20 @@ from __future__ import annotations +from collections import deque from typing import TYPE_CHECKING, Any from easynetwork.serializers.json import JSONSerializer import pytest +from .groups import SerializerGroup + if TYPE_CHECKING: from pytest_benchmark.fixture import BenchmarkFixture +@pytest.mark.benchmark(group=SerializerGroup.JSON_SERIALIZE) def bench_JSONSerializer_serialize( benchmark: BenchmarkFixture, json_object: Any, @@ -21,6 +25,7 @@ def bench_JSONSerializer_serialize( benchmark(serializer.serialize, json_object) +@pytest.mark.benchmark(group=SerializerGroup.JSON_DESERIALIZE) def bench_JSONSerializer_deserialize( benchmark: BenchmarkFixture, json_object: Any, @@ -33,6 +38,7 @@ def bench_JSONSerializer_deserialize( assert result == json_object +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_SERIALIZE) @pytest.mark.parametrize("use_lines", [False, True], ids=lambda p: f"use_lines=={p}") def bench_JSONSerializer_incremental_serialize( use_lines: bool, @@ -41,9 +47,10 @@ def bench_JSONSerializer_incremental_serialize( ) -> None: serializer = JSONSerializer(use_lines=use_lines) - benchmark(lambda: b"".join(serializer.incremental_serialize(json_object))) + benchmark(lambda: deque(serializer.incremental_serialize(json_object))) +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_DESERIALIZE) @pytest.mark.parametrize("use_lines", [False, True], ids=lambda p: f"use_lines=={p}") def bench_JSONSerializer_incremental_deserialize( use_lines: bool, diff --git a/micro_benchmarks/serializers/bench_line.py b/micro_benchmarks/serializers/bench_line.py index a5b04186..25453190 100644 --- a/micro_benchmarks/serializers/bench_line.py +++ b/micro_benchmarks/serializers/bench_line.py @@ -2,17 +2,31 @@ from __future__ import annotations +from collections import deque from typing import TYPE_CHECKING, Any from easynetwork.serializers.line import StringLineSerializer import pytest +from ..conftest import PAYLOAD_SIZE_LEVEL +from .groups import SerializerGroup + if TYPE_CHECKING: from pytest_benchmark.fixture import BenchmarkFixture -@pytest.fixture(scope="module", params=[1, 10], ids=lambda p: f"size=={p}kb") +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + if "line_str" in metafunc.fixturenames: + metafunc.parametrize( + "line_str", + list(metafunc.config.getoption(PAYLOAD_SIZE_LEVEL)), + indirect=True, + ids=lambda p: f"size=={p}kb", + ) + + +@pytest.fixture(scope="module") def line_str(request: pytest.FixtureRequest) -> str: size: int = request.param * 1024 return ("x" * (size - 1)) + "\n" @@ -23,6 +37,7 @@ def line_bytes(line_str: str) -> bytes: return bytes(line_str, "utf-8") +@pytest.mark.benchmark(group=SerializerGroup.LINE_SERIALIZE) def bench_StringLineSerializer_serialize( benchmark: BenchmarkFixture, line_str: str, @@ -35,6 +50,7 @@ def bench_StringLineSerializer_serialize( assert result == line_bytes +@pytest.mark.benchmark(group=SerializerGroup.LINE_DESERIALIZE) @pytest.mark.parametrize("keep_end", [False, True], ids=lambda p: f"keep_end=={p}") def bench_StringLineSerializer_deserialize( keep_end: bool, @@ -52,6 +68,7 @@ def bench_StringLineSerializer_deserialize( assert result == line_str.removesuffix("\n") +@pytest.mark.benchmark(group=SerializerGroup.LINE_INCREMENTAL_SERIALIZE) def bench_StringLineSerializer_incremental_serialize( benchmark: BenchmarkFixture, line_str: str, @@ -59,11 +76,12 @@ def bench_StringLineSerializer_incremental_serialize( ) -> None: serializer = StringLineSerializer() - result = benchmark(lambda: b"".join(serializer.incremental_serialize(line_str))) + result = b"".join(benchmark(lambda: deque(serializer.incremental_serialize(line_str)))) assert result == line_bytes +@pytest.mark.benchmark(group=SerializerGroup.LINE_INCREMENTAL_DESERIALIZE) @pytest.mark.parametrize("keep_end", [False, True], ids=lambda p: f"keep_end=={p}") @pytest.mark.parametrize("buffered", [False, True], ids=lambda p: f"buffered=={p}") def bench_StringLineSerializer_incremental_deserialize( @@ -78,11 +96,11 @@ def bench_StringLineSerializer_incremental_deserialize( if buffered: nbytes = len(line_bytes) buffer: bytearray = serializer.create_deserializer_buffer(nbytes) + buffer[:nbytes] = line_bytes def deserialize() -> Any: consumer = serializer.buffered_incremental_deserialize(buffer) - start_index: int = next(consumer) - buffer[start_index : start_index + nbytes] = line_bytes + next(consumer) try: consumer.send(nbytes) except StopIteration as exc: diff --git a/micro_benchmarks/serializers/bench_msgpack.py b/micro_benchmarks/serializers/bench_msgpack.py index bba69137..bb01ebb4 100644 --- a/micro_benchmarks/serializers/bench_msgpack.py +++ b/micro_benchmarks/serializers/bench_msgpack.py @@ -2,16 +2,20 @@ from __future__ import annotations +from collections import deque from typing import TYPE_CHECKING, Any from easynetwork.serializers.msgpack import MessagePackSerializer import pytest +from .groups import SerializerGroup + if TYPE_CHECKING: from pytest_benchmark.fixture import BenchmarkFixture +@pytest.mark.benchmark(group=SerializerGroup.JSON_SERIALIZE) def bench_MessagePackSerializer_serialize( benchmark: BenchmarkFixture, json_object: Any, @@ -21,6 +25,7 @@ def bench_MessagePackSerializer_serialize( benchmark(serializer.serialize, json_object) +@pytest.mark.benchmark(group=SerializerGroup.JSON_DESERIALIZE) def bench_MessagePackSerializer_deserialize( benchmark: BenchmarkFixture, json_object: Any, @@ -33,15 +38,17 @@ def bench_MessagePackSerializer_deserialize( assert result == json_object +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_SERIALIZE) def bench_MessagePackSerializer_incremental_serialize( benchmark: BenchmarkFixture, json_object: Any, ) -> None: serializer = MessagePackSerializer() - benchmark(lambda: b"".join(serializer.incremental_serialize(json_object))) + benchmark(lambda: deque(serializer.incremental_serialize(json_object))) +@pytest.mark.benchmark(group=SerializerGroup.JSON_INCREMENTAL_DESERIALIZE) @pytest.mark.parametrize("buffered", [False, True], ids=lambda p: f"buffered=={p}") def bench_MessagePackSerializer_incremental_deserialize( buffered: bool, @@ -54,11 +61,11 @@ def bench_MessagePackSerializer_incremental_deserialize( if buffered: nbytes = len(msgpack_data) buffer: memoryview = serializer.create_deserializer_buffer(nbytes) + buffer[:nbytes] = msgpack_data def deserialize() -> Any: consumer = serializer.buffered_incremental_deserialize(buffer) next(consumer) - buffer[:nbytes] = msgpack_data try: consumer.send(nbytes) except StopIteration as exc: diff --git a/micro_benchmarks/serializers/bench_pickle.py b/micro_benchmarks/serializers/bench_pickle.py index 4adaff40..12431003 100644 --- a/micro_benchmarks/serializers/bench_pickle.py +++ b/micro_benchmarks/serializers/bench_pickle.py @@ -8,10 +8,13 @@ import pytest +from .groups import SerializerGroup + if TYPE_CHECKING: from pytest_benchmark.fixture import BenchmarkFixture +@pytest.mark.benchmark(group=SerializerGroup.PICKLE_SERIALIZE) @pytest.mark.parametrize("pickler_optimize", [False, True], ids=lambda p: f"pickler_optimize=={p}") def bench_PickleSerializer_serialize( pickler_optimize: bool, @@ -23,6 +26,7 @@ def bench_PickleSerializer_serialize( benchmark(serializer.serialize, json_object) +@pytest.mark.benchmark(group=SerializerGroup.PICKLE_DESERIALIZE) def bench_PickleSerializer_deserialize( benchmark: BenchmarkFixture, json_object: Any, diff --git a/micro_benchmarks/serializers/conftest.py b/micro_benchmarks/serializers/conftest.py index fb5c5ce0..ecbd914b 100644 --- a/micro_benchmarks/serializers/conftest.py +++ b/micro_benchmarks/serializers/conftest.py @@ -6,35 +6,46 @@ import pytest +from ..conftest import PAYLOAD_SIZE_LEVEL + SAMPLES = importlib.resources.files(__package__) / "samples" -@pytest.fixture(scope="package", params=["data_1kb.json", "data_10kb.json"]) +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + if "json_object" in metafunc.fixturenames: + metafunc.parametrize( + "json_object", + [f"data_{payload_size}kb.json" for payload_size in metafunc.config.getoption(PAYLOAD_SIZE_LEVEL)], + indirect=True, + ) + + +@pytest.fixture(scope="package") def json_object(request: pytest.FixtureRequest) -> Any: file = SAMPLES / str(request.param) return json.loads(file.read_text()) -@pytest.fixture(scope="module") +@pytest.fixture(scope="package") def json_data(json_object: Any) -> bytes: return json.dumps(json_object, indent=None, separators=(",", ":")).encode() + b"\n" -@pytest.fixture(scope="module") +@pytest.fixture(scope="package") def cbor_data(json_object: Any) -> bytes: import cbor2 return cbor2.dumps(json_object) -@pytest.fixture(scope="module") +@pytest.fixture(scope="package") def msgpack_data(json_object: Any) -> bytes: import msgpack return msgpack.packb(json_object) -@pytest.fixture(scope="module") +@pytest.fixture(scope="package") def pickle_data(json_object: Any) -> bytes: import pickle diff --git a/micro_benchmarks/serializers/groups.py b/micro_benchmarks/serializers/groups.py new file mode 100644 index 00000000..87aac87b --- /dev/null +++ b/micro_benchmarks/serializers/groups.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import enum + + +@enum.unique +class SerializerGroup(enum.StrEnum): + @staticmethod + def _generate_next_value_(name: str, start: int, count: int, last_values: list[str]) -> str: + return name.lower().replace("_", "-") + + JSON_SERIALIZE = enum.auto() + JSON_DESERIALIZE = enum.auto() + + JSON_INCREMENTAL_SERIALIZE = enum.auto() + JSON_INCREMENTAL_DESERIALIZE = enum.auto() + + LINE_SERIALIZE = enum.auto() + LINE_DESERIALIZE = enum.auto() + + LINE_INCREMENTAL_SERIALIZE = enum.auto() + LINE_INCREMENTAL_DESERIALIZE = enum.auto() + + PICKLE_SERIALIZE = enum.auto() + PICKLE_DESERIALIZE = enum.auto() diff --git a/pytest-benchmark.ini b/pytest-benchmark.ini index d990872a..7deddcf9 100644 --- a/pytest-benchmark.ini +++ b/pytest-benchmark.ini @@ -7,7 +7,12 @@ python_functions = bench_* addopts = --strict-markers -p 'no:anyio' - --benchmark-group-by=fullfunc + --benchmark-warmup=on + --benchmark-warmup-iterations=10000 + --benchmark-timer=time.process_time + --benchmark-time-unit=us + --benchmark-min-time=0.000002 + --benchmark-sort=fullname --benchmark-disable-gc minversion = 7.4 testpaths = micro_benchmarks diff --git a/src/easynetwork/serializers/json.py b/src/easynetwork/serializers/json.py index 045051f9..0f83cb10 100644 --- a/src/easynetwork/serializers/json.py +++ b/src/easynetwork/serializers/json.py @@ -24,18 +24,20 @@ import re -import string -from collections import Counter from collections.abc import Callable, Generator from dataclasses import asdict as dataclass_asdict, dataclass -from typing import Any, final +from typing import TYPE_CHECKING, Any, final from ..exceptions import DeserializeError, IncrementalDeserializeError, LimitOverrunError -from ..lowlevel._utils import iter_bytes from ..lowlevel.constants import DEFAULT_SERIALIZER_LIMIT from .abc import AbstractIncrementalPacketSerializer from .tools import GeneratorStreamReader +if TYPE_CHECKING: + from _typeshed import ReadableBuffer + +_OBJECT_START = (b"{", b"[", b'"') + @dataclass(kw_only=True) class JSONEncoderConfig: @@ -161,7 +163,7 @@ def serialize(self, packet): Returns: a byte sequence. """ - return self.__encoder.encode(packet).encode(self.__encoding, self.__unicode_errors) + return bytes(self.__encoder.encode(packet), self.__encoding, self.__unicode_errors) @final def incremental_serialize(self, packet: Any) -> Generator[bytes]: @@ -179,8 +181,8 @@ def incremental_serialize(self, packet: Any) -> Generator[bytes]: Yields: all the parts of the JSON :term:`packet`. """ - data = self.__encoder.encode(packet).encode(self.__encoding, self.__unicode_errors) - if self.__use_lines or not data.startswith((b"{", b"[", b'"')): + data = bytes(self.__encoder.encode(packet), self.__encoding, self.__unicode_errors) + if self.__use_lines or not data.startswith(_OBJECT_START): data += b"\n" yield data @@ -259,11 +261,11 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[Any, bytes]]: Returns: a tuple with the deserialized Python object and the unused trailing data. """ - complete_document: bytes - remaining_data: bytes + complete_document: ReadableBuffer + remaining_data: ReadableBuffer if self.__use_lines: reader = GeneratorStreamReader() - complete_document = yield from reader.read_until(b"\n", limit=self.__limit) + complete_document = yield from reader.read_until(b"\n", limit=self.__limit, keep_end=False) remaining_data = reader.read_all() del reader else: @@ -299,7 +301,7 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[Any, bytes]]: }, ) from exc raise IncrementalDeserializeError(msg, remaining_data) from exc - return packet, remaining_data + return packet, bytes(remaining_data) @property @final @@ -319,112 +321,116 @@ def buffer_limit(self) -> int: class _JSONParser: - _JSON_VALUE_BYTES: frozenset[int] = frozenset(bytes(string.digits + string.ascii_letters + string.punctuation, "ascii")) - _ESCAPE_BYTE: int = ord(b"\\") + _QUOTE_BYTE: int = ord('"') + _ESCAPE_BYTE: int = ord("\\") + _ENCLOSURE_BYTES: dict[int, int] = { + ord("{"): ord("}"), + ord("["): ord("]"), + ord('"'): ord('"'), + } + + _NON_PRINTABLE: re.Pattern[bytes] = re.compile(rb"[^\x20-\x7E]", re.MULTILINE | re.DOTALL) + _WHITESPACES: re.Pattern[bytes] = re.compile(rb"[ \t\n\r]*", re.MULTILINE | re.DOTALL) + _JSON_FRAMES: dict[int, re.Pattern[bytes]] = { + ord("{"): re.compile(rb'(?:\\.|[\{\}"])', re.MULTILINE | re.DOTALL), + ord("["): re.compile(rb'(?:\\.|[\[\]"])', re.MULTILINE | re.DOTALL), + ord('"'): re.compile(rb'(?:\\.|["])', re.MULTILINE | re.DOTALL), + } + + @classmethod + def raw_parse( + cls, + *, + limit: int, + _w: Callable[[bytes, int], re.Match[bytes]] = _WHITESPACES.match, # type: ignore[assignment] + _np: Callable[[bytes, int], re.Match[bytes] | None] = _NON_PRINTABLE.search, + ) -> Generator[None, bytes, tuple[ReadableBuffer, ReadableBuffer]]: + if limit <= 0: + raise ValueError("limit must be a positive integer") - _whitespaces_match: Callable[[bytes, int], re.Match[bytes]] = re.compile(rb"[ \t\n\r]*", re.MULTILINE | re.DOTALL).match # type: ignore[assignment] + append_to_buffer = cls.__append_to_buffer - class _PlainValueError(Exception): - pass + partial_document: bytes | bytearray = bytes((yield)) + start_idx: int + while (start_idx := _w(partial_document, 0).end()) == len(partial_document): + del partial_document + partial_document = bytes((yield)) - @staticmethod - def _escaped(partial_document_view: memoryview) -> bool: - escaped = False - _ESCAPE_BYTE = _JSONParser._ESCAPE_BYTE - for byte in reversed(partial_document_view): - if byte == _ESCAPE_BYTE: - escaped = not escaped - else: - break - return escaped + end_idx: int = start_idx + if (open_enclosure := partial_document[start_idx]) not in cls._ENCLOSURE_BYTES: + while not (nprint_search := _np(partial_document, end_idx)): + partial_document, end_idx = yield from append_to_buffer(partial_document, limit) - @staticmethod - def raw_parse(*, limit: int) -> Generator[None, bytes, tuple[bytes, bytes]]: - if limit <= 0: - raise ValueError("limit must be a positive integer") - escaped = _JSONParser._escaped - split_partial_document = _JSONParser._split_partial_document - enclosure_counter: Counter[bytes] = Counter() - partial_document: bytes = yield - first_enclosure: bytes = b"" - try: - offset: int = 0 - while True: - with memoryview(partial_document) as partial_document_view: - for offset, char in enumerate(iter_bytes(partial_document_view[offset:]), start=offset): - match char: - case b'"' if not escaped(partial_document_view[:offset]): - enclosure_counter[b'"'] = 0 if enclosure_counter[b'"'] == 1 else 1 - case _ if enclosure_counter[b'"'] > 0: # We are within a JSON string, move on. - continue - case b"{" | b"[": - enclosure_counter[char] += 1 - case b"}": - enclosure_counter[b"{"] -= 1 - case b"]": - enclosure_counter[b"["] -= 1 - case b" " | b"\t" | b"\n" | b"\r": # Optimization: Skip spaces - continue - case _ if len(enclosure_counter) == 0: # No enclosure, only value - partial_document = partial_document[offset:] if offset > 0 else partial_document - del char, offset - raise _JSONParser._PlainValueError - case _: # JSON character, quickly go to next character - continue - assert len(enclosure_counter) > 0 # nosec assert_used - if not first_enclosure: - first_enclosure = next(iter(enclosure_counter)) - if enclosure_counter[first_enclosure] <= 0: # 1st found is closed - return split_partial_document(partial_document, offset + 1, limit) - - # partial_document not complete - offset = partial_document_view.nbytes - if offset > limit: - raise LimitOverrunError( - "JSON object's end frame is not found, and chunk exceed the limit", - partial_document, - offset, - ) - - # yield outside view scope - partial_document += yield - - except _JSONParser._PlainValueError: - pass - - # The document is a plain value (null, true, false, or a number) - - del enclosure_counter, first_enclosure - - _JSON_VALUE_BYTES = _JSONParser._JSON_VALUE_BYTES - - while (nprint_idx := next((idx for idx, byte in enumerate(partial_document) if byte not in _JSON_VALUE_BYTES), -1)) < 0: - if len(partial_document) > limit: - raise LimitOverrunError( - "JSON object's end frame is not found, and chunk exceed the limit", - partial_document, - len(partial_document), - ) - partial_document += yield - - return split_partial_document(partial_document, nprint_idx, limit) + end_idx = nprint_search.start() + complete_document, partial_document = cls.__split_final_buffer( + partial_document, + start_idx=start_idx, + end_idx=end_idx, + limit=limit, + ) + if not complete_document: + # If this condition is verified, decoder.decode() will most likely raise JSONDecodeError + return partial_document, b"" + return complete_document, partial_document + + ESCAPE_BYTE = cls._ESCAPE_BYTE + QUOTE_BYTE = cls._QUOTE_BYTE + close_enclosure = cls._ENCLOSURE_BYTES[open_enclosure] + token_finder = cls._JSON_FRAMES[open_enclosure].finditer + enclosure_count: int = 1 + between_quotes: bool = open_enclosure == QUOTE_BYTE + end_idx += 1 + while True: + for enclosure_match in token_finder(partial_document, end_idx): + byte = enclosure_match[0][0] + if byte == QUOTE_BYTE: + between_quotes = not between_quotes + elif between_quotes: + continue + + # Try first to match close_enclosure for frames using the same character for opening and closing (e.g. strings) + if byte == close_enclosure: + enclosure_count -= 1 + elif byte == open_enclosure: # pragma: no branch + enclosure_count += 1 + + if not enclosure_count: # 1st found is closed + end_idx = enclosure_match.end() + return cls.__split_final_buffer(partial_document, start_idx=start_idx, end_idx=end_idx, limit=limit) + + partial_document, end_idx = yield from append_to_buffer(partial_document, limit) + while partial_document[end_idx - 1] == ESCAPE_BYTE: + end_idx -= 1 @staticmethod - def _split_partial_document(partial_document: bytes, consumed: int, limit: int) -> tuple[bytes, bytes]: - if consumed > limit: + def __split_final_buffer( + partial_document: bytes | bytearray, + *, + start_idx: int, + end_idx: int, + limit: int, + _w: Callable[[bytes, int], re.Match[bytes]] = _WHITESPACES.match, # type: ignore[assignment] + ) -> tuple[memoryview, memoryview]: + if end_idx > limit: raise LimitOverrunError( "JSON object's end frame is found, but chunk is longer than limit", partial_document, - consumed, + end_idx, + ) + end_idx = _w(partial_document, end_idx).end() + partial_document_view = memoryview(partial_document) + return partial_document_view[start_idx:end_idx], partial_document_view[end_idx:] + + @staticmethod + def __append_to_buffer(partial_document: bytes | bytearray, limit: int) -> Generator[None, bytes, tuple[bytearray, int]]: + end_idx = len(partial_document) + if end_idx > limit: + raise LimitOverrunError( + "JSON object's end frame is not found, and chunk exceed the limit", + partial_document, + end_idx, ) - consumed = _JSONParser._whitespaces_match(partial_document, consumed).end() - if consumed == len(partial_document): - # The following bytes are only spaces - # Do not slice the document, the trailing spaces will be ignored by JSONDecoder - return partial_document, b"" - complete_document, partial_document = partial_document[:consumed], partial_document[consumed:] - if not complete_document: - # If this condition is verified, decoder.decode() will most likely raise JSONDecodeError - complete_document = partial_document - partial_document = b"" - return complete_document, partial_document + if not isinstance(partial_document, bytearray): + partial_document = bytearray(partial_document) + partial_document.extend((yield)) + return partial_document, end_idx diff --git a/src/easynetwork/serializers/line.py b/src/easynetwork/serializers/line.py index df9b4177..6c9d976f 100644 --- a/src/easynetwork/serializers/line.py +++ b/src/easynetwork/serializers/line.py @@ -119,10 +119,7 @@ def serialize(self, packet): Important: The output **does not** contain `newline`. """ - if __debug__: - if not isinstance(packet, str): - raise TypeError(f"Expected a string, got {packet!r}") - return packet.encode(self.__encoding, self.__unicode_errors) + return bytes(packet, self.__encoding, self.__unicode_errors) @final def incremental_serialize(self, packet: str) -> Generator[bytes]: @@ -135,10 +132,7 @@ def incremental_serialize(self, packet: str) -> Generator[bytes]: TypeError: `packet` is not a :class:`str`. UnicodeError: Invalid string. """ - if __debug__: - if not isinstance(packet, str): - raise TypeError(f"Expected a string, got {packet!r}") - data: bytes = packet.encode(self.__encoding, self.__unicode_errors) + data = bytes(packet, self.__encoding, self.__unicode_errors) if not data: return separator: bytes = self.__separator diff --git a/tests/unit_test/test_serializers/test_json.py b/tests/unit_test/test_serializers/test_json.py index e89d711f..bda97194 100644 --- a/tests/unit_test/test_serializers/test_json.py +++ b/tests/unit_test/test_serializers/test_json.py @@ -327,7 +327,7 @@ def reader_read_until_side_effect(*args: Any, **kwargs: Any) -> Generator[None, if use_lines: mock_json_parser.assert_not_called() mock_generator_stream_reader_cls.assert_called_once_with() - mock_generator_stream_reader.read_until.assert_called_once_with(b"\n", limit=DEFAULT_LIMIT) + mock_generator_stream_reader.read_until.assert_called_once_with(b"\n", limit=DEFAULT_LIMIT, keep_end=False) mock_generator_stream_reader.read_all.assert_called_once_with() else: mock_json_parser.assert_called_once_with(limit=DEFAULT_LIMIT) @@ -432,6 +432,18 @@ def test____raw_parse____object_frame(self) -> None: assert complete == b'{"data":42}' assert remainder == b"remainder" + def test____raw_parse____object_frame____skip_inner_brackets(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + complete, remainder = send_return(consumer, b'{"data": {"something": 42}}remainder') + + # Assert + assert complete == b'{"data": {"something": 42}}' + assert remainder == b"remainder" + def test____raw_parse____object_frame____skip_bracket_in_strings(self) -> None: # Arrange consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) @@ -458,6 +470,46 @@ def test____raw_parse____object_frame____whitespaces(self) -> None: assert complete == b'{"data": 42,\n"list": [true, false, null]\n}\n' assert remainder == b"" + def test____raw_parse____object_frame____leading_whitespace_skip(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b"\t\r\n ") + consumer.send(b'\n {"data"') + complete, remainder = send_return(consumer, b":42}remainder") + + # Assert + assert complete == b'{"data":42}' + assert remainder == b"remainder" + + def test____raw_parse____object_frame____escaped_quote(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + complete, remainder = send_return(consumer, b'{"data\\"":42}remainder') + + # Assert + assert complete == b'{"data\\"":42}' + assert remainder == b"remainder" + + def test____raw_parse____object_frame____escaped_quote____partial(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b'{"data') + consumer.send(b'\\"') + complete, remainder = send_return(consumer, b'":42}remainder') + + # Assert + assert complete == b'{"data\\"":42}' + assert remainder == b"remainder" + def test____raw_parse____list_frame(self) -> None: # Arrange consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) @@ -472,6 +524,18 @@ def test____raw_parse____list_frame(self) -> None: assert complete == b'[{"data":42}]' assert remainder == b"remainder" + def test____raw_parse____list_frame____skip_inner_brackets(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + complete, remainder = send_return(consumer, b'[["string"], ["second"]]remainder') + + # Assert + assert complete == b'[["string"], ["second"]]' + assert remainder == b"remainder" + def test____raw_parse____list_frame____skip_bracket_in_strings(self) -> None: # Arrange consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) @@ -500,6 +564,46 @@ def test____raw_parse____list_frame____whitespaces(self) -> None: assert complete == b'[{\n"data": 42,\n "test": true},\nnull,\n"string"\n]\n' assert remainder == b"" + def test____raw_parse____leading_whitespace_skip(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b"\t\r\n ") + consumer.send(b'\n [{"data"') + complete, remainder = send_return(consumer, b":42}]remainder") + + # Assert + assert complete == b'[{"data":42}]' + assert remainder == b"remainder" + + def test____raw_parse____list_frame____escaped_quote(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + complete, remainder = send_return(consumer, b'["data\\""]remainder') + + # Assert + assert complete == b'["data\\""]' + assert remainder == b"remainder" + + def test____raw_parse____list_frame____escaped_quote____partial(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b'["data') + consumer.send(b'\\"') + complete, remainder = send_return(consumer, b'"]remainder') + + # Assert + assert complete == b'["data\\""]' + assert remainder == b"remainder" + def test____raw_parse____string_frame(self) -> None: # Arrange consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) @@ -514,11 +618,37 @@ def test____raw_parse____string_frame(self) -> None: assert complete == b'"data{}"' assert remainder == b"remainder" + def test____raw_parse____string_frame____leading_whitespace_skip(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b"\t\r\n ") + consumer.send(b'\n "data') + complete, remainder = send_return(consumer, b'"remainder') + + # Assert + assert complete == b'"data"' + assert remainder == b"remainder" + def test____raw_parse____string_frame____escaped_quote(self) -> None: # Arrange consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) next(consumer) + # Act + complete, remainder = send_return(consumer, b'"data\\""remainder') + + # Assert + assert complete == b'"data\\""' + assert remainder == b"remainder" + + def test____raw_parse____string_frame____escaped_quote____partial(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + # Act consumer.send(b'"data') consumer.send(b'\\"') @@ -567,6 +697,20 @@ def test____raw_parse____plain_value____first_character_is_invalid(self) -> None assert complete == b"\0" assert remainder == b"" + def test____raw_parse____plain_value____leading_whitespace_skip(self) -> None: + # Arrange + consumer = _JSONParser.raw_parse(limit=DEFAULT_LIMIT) + next(consumer) + + # Act + consumer.send(b"\t\r\n ") + consumer.send(b"\n tr") + complete, remainder = send_return(consumer, b"ue\nremainder") + + # Assert + assert complete == b"true\n" + assert remainder == b"remainder" + @pytest.mark.parametrize("limit", [0, -42], ids=lambda p: f"limit=={p}") def test____raw_parse____invalid_limit(self, limit: int) -> None: # Arrange diff --git a/tests/unit_test/test_serializers/test_line.py b/tests/unit_test/test_serializers/test_line.py index 64649e3d..bafc2b2f 100644 --- a/tests/unit_test/test_serializers/test_line.py +++ b/tests/unit_test/test_serializers/test_line.py @@ -153,7 +153,7 @@ def test____serialize____not_a_string_error( # Arrange # Act & Assert - with pytest.raises(TypeError, match=r"^Expected a string, got 4$"): + with pytest.raises(TypeError, match=r"^encoding without a string argument$"): serializer.serialize(4) # type: ignore[arg-type] def test____serialize____empty_string( @@ -235,7 +235,7 @@ def test____incremental_serialize____not_a_string_error( # Arrange # Act & Assert - with pytest.raises(TypeError, match=r"^Expected a string, got 4$"): + with pytest.raises(TypeError, match=r"^encoding without a string argument$"): list(serializer.incremental_serialize(4)) # type: ignore[arg-type] def test____incremental_serialize____append_separator( diff --git a/tox.ini b/tox.ini index 879db54e..7c48c001 100644 --- a/tox.ini +++ b/tox.ini @@ -259,7 +259,7 @@ setenv = passenv = {[base]passenv} commands = - pytest -c pytest-benchmark.ini {posargs:--benchmark-histogram=benchmark_reports{/}micro_benches{/}benchmark} + pytest -c pytest-benchmark.ini {posargs} --benchmark-histogram=benchmark_reports{/}micro_benches{/}benchmark micro_benchmarks [testenv:benchmark-server-{tcpecho,sslecho,unixstreamecho,readline_tcp,readline_unix,udpecho,unixdgramecho}-{easynetwork,stdlib}] skip_install = true