Skip to content

Commit

Permalink
fix: call deserializer regardless consumer_record value (#83)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Schroh <[email protected]>
  • Loading branch information
marcosschroh and marcosschroh authored Dec 6, 2022
1 parent 1bac05e commit 94c4428
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 18 deletions.
2 changes: 0 additions & 2 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ you may want to receive in your function the `dict` ready to be used. For those
docstring_section_style: table
show_bases: false


::: kstreams.serializers.Deserializer
options:
show_root_heading: true
docstring_section_style: table
show_bases: false


## Usage

Once you have written your serializer or deserializer, there are 2 ways of using them, in a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async def deserialize(
"""
Deserialize a payload to an AvroModel
"""
data = self.model.deserialize(consumer_record.value)
consumer_record.value = data
if consumer_record.value is not None:
data = self.model.deserialize(consumer_record.value)
consumer_record.value = data
return consumer_record
5 changes: 3 additions & 2 deletions examples/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class JsonDeserializer:
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
if consumer_record.value is not None:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record


Expand Down
5 changes: 3 additions & 2 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ async def __anext__(self) -> ConsumerRecord:
await self.consumer.getone() # type: ignore
)

# deserialize only when value and deserializer are present
if consumer_record.value is not None and self.deserializer is not None:
# call deserializer if there is one regarless consumer_record.value
# as the end user might want to do something extra with headers or metadata
if self.deserializer is not None:
return await self.deserializer.deserialize(consumer_record)

return consumer_record
Expand Down
26 changes: 16 additions & 10 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ async def serialize(

class MyDeserializer:
async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any:
data = consumer_record.value.decode()
return json.loads(data)
if consumer_record.value is not None:
data = consumer_record.value.decode()
return json.loads(data)


@pytest.mark.asyncio
Expand All @@ -44,7 +45,6 @@ async def async_func():
"content-type": consts.APPLICATION_JSON,
}

# with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT):
with mock.patch.multiple(Producer, start=mock.DEFAULT, send=send):
await stream_engine.start()

Expand All @@ -70,14 +70,20 @@ async def async_func():


@pytest.mark.asyncio
@pytest.mark.parametrize(
"value, headers",
(
(
{"message": "test"},
{"content-type": consts.APPLICATION_JSON, "event-type": "hello-world"},
),
(None, {"event-type": "delete-hello-world"}),
),
)
async def test_custom_deserialization(
stream_engine: StreamEngine, consumer_record_factory
stream_engine: StreamEngine, value: Optional[Dict], headers: Dict
):
topic = "local--hello-kpn"
payload = {"message": "test"}
headers = {
"content-type": consts.APPLICATION_JSON,
}
client = TestStreamClient(stream_engine)

save_to_db = mock.Mock()
Expand All @@ -91,12 +97,12 @@ async def hello_stream(stream: Stream):
# encode payload with serializer
await client.send(
topic,
value=payload,
value=value,
headers=headers,
key="1",
serializer=MySerializer(),
)

# The payload as been encoded with json,
# we expect that the mock has been called with the original value (decoded)
save_to_db.assert_called_once_with(payload)
save_to_db.assert_called_once_with(value)

0 comments on commit 94c4428

Please sign in to comment.