From 94c442848946bc594e6e2e6b3aa240c0437b6549 Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Tue, 6 Dec 2022 14:12:28 +0100 Subject: [PATCH] fix: call deserializer regardless consumer_record value (#83) Co-authored-by: Marcos Schroh --- docs/serialization.md | 2 -- .../serializers.py | 5 ++-- examples/json_serialization.py | 5 ++-- kstreams/streams.py | 5 ++-- tests/test_serialization.py | 26 ++++++++++++------- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/docs/serialization.md b/docs/serialization.md index 09aba0cf..fbcd6772 100644 --- a/docs/serialization.md +++ b/docs/serialization.md @@ -24,14 +24,12 @@ you may want to receive in your function the `dict` ready to be used. For those docstring_section_style: table show_bases: false - ::: kstreams.serializers.Deserializer options: show_root_heading: true docstring_section_style: table show_bases: false - ## Usage Once you have written your serializer or deserializer, there are 2 ways of using them, in a diff --git a/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/serializers.py b/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/serializers.py index 843f33c5..644eb83f 100644 --- a/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/serializers.py +++ b/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/serializers.py @@ -21,6 +21,7 @@ async def deserialize( """ Deserialize a payload to an AvroModel """ - data = self.model.deserialize(consumer_record.value) - consumer_record.value = data + if consumer_record.value is not None: + data = self.model.deserialize(consumer_record.value) + consumer_record.value = data return consumer_record diff --git a/examples/json_serialization.py b/examples/json_serialization.py index 018ba0ad..3130ab7b 100644 --- a/examples/json_serialization.py +++ b/examples/json_serialization.py @@ -24,8 +24,9 @@ class JsonDeserializer: async def deserialize( self, consumer_record: ConsumerRecord, **kwargs ) -> ConsumerRecord: - data = json.loads(consumer_record.value.decode()) - consumer_record.value = data + if consumer_record.value is not None: + data = json.loads(consumer_record.value.decode()) + consumer_record.value = data return consumer_record diff --git a/kstreams/streams.py b/kstreams/streams.py index 4b53c609..0aa0f45f 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -166,8 +166,9 @@ async def __anext__(self) -> ConsumerRecord: await self.consumer.getone() # type: ignore ) - # deserialize only when value and deserializer are present - if consumer_record.value is not None and self.deserializer is not None: + # call deserializer if there is one regarless consumer_record.value + # as the end user might want to do something extra with headers or metadata + if self.deserializer is not None: return await self.deserializer.deserialize(consumer_record) return consumer_record diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 63c8adfc..b85718ba 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -28,8 +28,9 @@ async def serialize( class MyDeserializer: async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any: - data = consumer_record.value.decode() - return json.loads(data) + if consumer_record.value is not None: + data = consumer_record.value.decode() + return json.loads(data) @pytest.mark.asyncio @@ -44,7 +45,6 @@ async def async_func(): "content-type": consts.APPLICATION_JSON, } - # with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): with mock.patch.multiple(Producer, start=mock.DEFAULT, send=send): await stream_engine.start() @@ -70,14 +70,20 @@ async def async_func(): @pytest.mark.asyncio +@pytest.mark.parametrize( + "value, headers", + ( + ( + {"message": "test"}, + {"content-type": consts.APPLICATION_JSON, "event-type": "hello-world"}, + ), + (None, {"event-type": "delete-hello-world"}), + ), +) async def test_custom_deserialization( - stream_engine: StreamEngine, consumer_record_factory + stream_engine: StreamEngine, value: Optional[Dict], headers: Dict ): topic = "local--hello-kpn" - payload = {"message": "test"} - headers = { - "content-type": consts.APPLICATION_JSON, - } client = TestStreamClient(stream_engine) save_to_db = mock.Mock() @@ -91,7 +97,7 @@ async def hello_stream(stream: Stream): # encode payload with serializer await client.send( topic, - value=payload, + value=value, headers=headers, key="1", serializer=MySerializer(), @@ -99,4 +105,4 @@ async def hello_stream(stream: Stream): # The payload as been encoded with json, # we expect that the mock has been called with the original value (decoded) - save_to_db.assert_called_once_with(payload) + save_to_db.assert_called_once_with(value)