diff --git a/poetry.lock b/poetry.lock index 5a805d2..7a7b558 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -946,6 +946,7 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, + {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -953,8 +954,15 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, + {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, + {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, + {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, + {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -971,6 +979,7 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, + {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -978,6 +987,7 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, + {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, @@ -1390,4 +1400,4 @@ zmq = ["pyzmq"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "3ec5450a6245f5beceb41aeeb47ba722ab06e15e17fbd0432ad71cee6ee52bac" +content-hash = "80281541d7ef5406fd0fec79fcc8025c743dc7fa820e444912431553c050a25b" diff --git a/pyproject.toml b/pyproject.toml index a2ed4f2..9031103 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ freezegun = "^1.2.2" pytest-mock = "^3.11.1" tzlocal = "^5.0.1" types-tzlocal = "^5.0.1.1" +types-pytz = "^2023.3.1.1" [tool.poetry.extras] zmq = ["pyzmq"] diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index b935e38..234c2ca 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -25,12 +25,14 @@ from typing_extensions import ParamSpec, Self, TypeAlias from taskiq.abc.middleware import TaskiqMiddleware +from taskiq.abc.serializer import TaskiqSerializer from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from taskiq.events import TaskiqEvents -from taskiq.formatters.json_formatter import JSONFormatter +from taskiq.formatters.proxy_formatter import ProxyFormatter from taskiq.message import BrokerMessage from taskiq.result_backends.dummy import DummyResultBackend +from taskiq.serializers.json_serializer import JSONSerializer from taskiq.state import TaskiqState from taskiq.utils import maybe_awaitable, remove_suffix from taskiq.warnings import TaskiqDeprecationWarning @@ -97,7 +99,8 @@ def __init__( self.middlewares: "List[TaskiqMiddleware]" = [] self.result_backend = result_backend self.decorator_class = AsyncTaskiqDecoratedTask - self.formatter: "TaskiqFormatter" = JSONFormatter() + self.serializer: TaskiqSerializer = JSONSerializer() + self.formatter: "TaskiqFormatter" = ProxyFormatter(self) self.id_generator = task_id_generator self.local_task_registry: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {} # Every event has a list of handlers. @@ -479,6 +482,19 @@ def with_event_handlers( self.event_handlers[event].extend(handlers) return self + def with_serializer( + self, + serializer: TaskiqSerializer, + ) -> "Self": # pragma: no cover + """ + Set a new serializer and return an updated broker. + + :param serializer: new serializer. + :return: self + """ + self.serializer = serializer + return self + def _register_task( self, task_name: str, diff --git a/taskiq/abc/serializer.py b/taskiq/abc/serializer.py new file mode 100644 index 0000000..4f53ccf --- /dev/null +++ b/taskiq/abc/serializer.py @@ -0,0 +1,24 @@ +from abc import ABC, abstractmethod +from typing import Any + + +class TaskiqSerializer(ABC): + """Custom serializer for brokers.""" + + @abstractmethod + def dumpb(self, value: Any) -> bytes: + """ + Dump value to bytes for sending through the wire. + + :param value: value to encode. + :return: encoded value. + """ + + @abstractmethod + def loadb(self, value: bytes) -> Any: + """ + Parse byte-encoded value received from the wire. + + :param message: value to parse. + :return: decoded value. + """ diff --git a/taskiq/compat.py b/taskiq/compat.py index 2939c5f..48312fd 100644 --- a/taskiq/compat.py +++ b/taskiq/compat.py @@ -16,6 +16,15 @@ def parse_obj_as(annot: T, obj: Any) -> T: return pydantic.TypeAdapter(annot).validate_python(obj) + def model_validate( + model_class: Type[Model], + message: Dict[str, Any], + ) -> Model: + return model_class.model_validate(message) + + def model_dump(instance: Model) -> Dict[str, Any]: + return instance.model_dump() + def model_validate_json( model_class: Type[Model], message: Union[str, bytes, bytearray], @@ -37,6 +46,15 @@ def model_copy( else: parse_obj_as = pydantic.parse_obj_as # type: ignore + def model_validate( + model_class: Type[Model], + message: Dict[str, Any], + ) -> Model: + return model_class.parse_obj(message) + + def model_dump(instance: Model) -> Dict[str, Any]: + return instance.dict() + def model_validate_json( model_class: Type[Model], message: Union[str, bytes, bytearray], diff --git a/taskiq/formatters/json_formatter.py b/taskiq/formatters/json_formatter.py index fa7baf0..3d49e28 100644 --- a/taskiq/formatters/json_formatter.py +++ b/taskiq/formatters/json_formatter.py @@ -4,7 +4,7 @@ class JSONFormatter(TaskiqFormatter): - """Default taskiq formatter.""" + """JSON taskiq formatter.""" def dumps(self, message: TaskiqMessage) -> BrokerMessage: """ diff --git a/taskiq/formatters/proxy_formatter.py b/taskiq/formatters/proxy_formatter.py new file mode 100644 index 0000000..a220871 --- /dev/null +++ b/taskiq/formatters/proxy_formatter.py @@ -0,0 +1,38 @@ +from typing import TYPE_CHECKING + +from taskiq.abc.formatter import TaskiqFormatter +from taskiq.compat import model_dump, model_validate +from taskiq.message import BrokerMessage, TaskiqMessage + +if TYPE_CHECKING: + from taskiq.abc.broker import AsyncBroker + + +class ProxyFormatter(TaskiqFormatter): + """Default taskiq formatter.""" + + def __init__(self, broker: "AsyncBroker") -> None: + self.broker = broker + + def dumps(self, message: TaskiqMessage) -> BrokerMessage: + """ + Dumps taskiq message to some broker message format. + + :param message: message to send. + :return: Dumped message. + """ + return BrokerMessage( + task_id=message.task_id, + task_name=message.task_name, + message=self.broker.serializer.dumpb(model_dump(message)), + labels=message.labels, + ) + + def loads(self, message: bytes) -> TaskiqMessage: + """ + Loads json from message. + + :param message: broker's message. + :return: parsed taskiq message. + """ + return model_validate(TaskiqMessage, self.broker.serializer.loadb(message)) diff --git a/taskiq/message.py b/taskiq/message.py index 6cc0468..129073f 100644 --- a/taskiq/message.py +++ b/taskiq/message.py @@ -14,7 +14,7 @@ class TaskiqMessage(BaseModel): task_id: str task_name: str - labels: Dict[str, str] + labels: Dict[str, Any] args: List[Any] kwargs: Dict[str, Any] @@ -25,4 +25,4 @@ class BrokerMessage(BaseModel): task_id: str task_name: str message: bytes - labels: Dict[str, str] + labels: Dict[str, Any] diff --git a/taskiq/serializers/__init__.py b/taskiq/serializers/__init__.py new file mode 100644 index 0000000..2f24f33 --- /dev/null +++ b/taskiq/serializers/__init__.py @@ -0,0 +1 @@ +"""Taskiq serializers.""" diff --git a/taskiq/serializers/json_serializer.py b/taskiq/serializers/json_serializer.py new file mode 100644 index 0000000..e7d8d38 --- /dev/null +++ b/taskiq/serializers/json_serializer.py @@ -0,0 +1,26 @@ +from json import dumps, loads +from typing import Any + +from taskiq.abc.serializer import TaskiqSerializer + + +class JSONSerializer(TaskiqSerializer): + """Default taskiq serizalizer.""" + + def dumpb(self, value: Any) -> bytes: + """ + Dumps taskiq message to some broker message format. + + :param message: message to send. + :return: Dumped message. + """ + return dumps(value).encode() + + def loadb(self, value: bytes) -> Any: + """ + Parse byte-encoded value received from the wire. + + :param message: value to parse. + :return: decoded value. + """ + return loads(value.decode()) diff --git a/tests/formatters/test_json_formatter.py b/tests/formatters/test_json_formatter.py new file mode 100644 index 0000000..7801da9 --- /dev/null +++ b/tests/formatters/test_json_formatter.py @@ -0,0 +1,41 @@ +from taskiq.formatters.json_formatter import JSONFormatter +from taskiq.message import BrokerMessage, TaskiqMessage + + +def test_json_dumps() -> None: + fmt = JSONFormatter() + msg = TaskiqMessage( + task_id="task-id", + task_name="task.name", + labels={"label1": 1, "label2": "text"}, + args=[1, "a"], + kwargs={"p1": "v1"}, + ) + expected = BrokerMessage( + task_id="task-id", + task_name="task.name", + message=( + b'{"task_id":"task-id","task_name":"task.name",' + b'"labels":{"label1":1,"label2":"text"},' + b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' + ), + labels={"label1": 1, "label2": "text"}, + ) + assert fmt.dumps(msg) == expected + + +def test_json_loads() -> None: + fmt = JSONFormatter() + msg = ( + b'{"task_id":"task-id","task_name":"task.name",' + b'"labels":{"label1":1,"label2":"text"},' + b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' + ) + expected = TaskiqMessage( + task_id="task-id", + task_name="task.name", + labels={"label1": 1, "label2": "text"}, + args=[1, "a"], + kwargs={"p1": "v1"}, + ) + assert fmt.loads(msg) == expected diff --git a/tests/formatters/test_proxy_formatter.py b/tests/formatters/test_proxy_formatter.py new file mode 100644 index 0000000..1c68e39 --- /dev/null +++ b/tests/formatters/test_proxy_formatter.py @@ -0,0 +1,43 @@ +from taskiq.brokers.inmemory_broker import InMemoryBroker +from taskiq.message import BrokerMessage, TaskiqMessage + + +def test_proxy_dumps() -> None: + # uses json serializer by default + broker = InMemoryBroker() + msg = TaskiqMessage( + task_id="task-id", + task_name="task.name", + labels={"label1": 1, "label2": "text"}, + args=[1, "a"], + kwargs={"p1": "v1"}, + ) + expected = BrokerMessage( + task_id="task-id", + task_name="task.name", + message=( + b'{"task_id": "task-id", "task_name": "task.name", ' + b'"labels": {"label1": 1, "label2": "text"}, ' + b'"args": [1, "a"], "kwargs": {"p1": "v1"}}' + ), + labels={"label1": 1, "label2": "text"}, + ) + assert broker.formatter.dumps(msg) == expected + + +def test_proxy_loads() -> None: + # uses json serializer by default + broker = InMemoryBroker() + msg = ( + b'{"task_id":"task-id","task_name":"task.name",' + b'"labels":{"label1":1,"label2":"text"},' + b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' + ) + expected = TaskiqMessage( + task_id="task-id", + task_name="task.name", + labels={"label1": 1, "label2": "text"}, + args=[1, "a"], + kwargs={"p1": "v1"}, + ) + assert broker.formatter.loads(msg) == expected diff --git a/tests/serializers/test_json_serializer.py b/tests/serializers/test_json_serializer.py new file mode 100644 index 0000000..96ec847 --- /dev/null +++ b/tests/serializers/test_json_serializer.py @@ -0,0 +1,19 @@ +from taskiq.serializers.json_serializer import JSONSerializer + + +def test_json_dumpb() -> None: + serizalizer = JSONSerializer() + assert serizalizer.dumpb(None) == b"null" # noqa: PLR2004 + assert serizalizer.dumpb(1) == b"1" # noqa: PLR2004 + assert serizalizer.dumpb("a") == b'"a"' # noqa: PLR2004 + assert serizalizer.dumpb(["a"]) == b'["a"]' # noqa: PLR2004 + assert serizalizer.dumpb({"a": "b"}) == b'{"a": "b"}' # noqa: PLR2004 + + +def test_json_loadb() -> None: + serizalizer = JSONSerializer() + assert serizalizer.loadb(b"null") is None + assert serizalizer.loadb(b"1") == 1 + assert serizalizer.loadb(b'"a"') == "a" + assert serizalizer.loadb(b'["a"]') == ["a"] + assert serizalizer.loadb(b'{"a": "b"}') == {"a": "b"}