From d40ca77e7db5d38ea723770cf9f91d14a48736dd Mon Sep 17 00:00:00 2001 From: Anton Duyun Date: Mon, 7 Aug 2023 15:45:48 +0300 Subject: [PATCH] Json kafka serializer & deserializer resolves #241 --- .../tinkoff/kora/json/module/JsonModule.java | 12 ++++++++++ .../module/kafka/JsonKafkaDeserializer.java | 24 +++++++++++++++++++ .../module/kafka/JsonKafkaSerializer.java | 24 +++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaDeserializer.java create mode 100644 json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaSerializer.java diff --git a/json/json-module/src/main/java/ru/tinkoff/kora/json/module/JsonModule.java b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/JsonModule.java index c9c433e56..009350fb8 100644 --- a/json/json-module/src/main/java/ru/tinkoff/kora/json/module/JsonModule.java +++ b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/JsonModule.java @@ -10,6 +10,8 @@ import ru.tinkoff.kora.json.module.http.server.JsonReaderHttpServerRequestMapper; import ru.tinkoff.kora.json.module.http.server.JsonStringParameterReader; import ru.tinkoff.kora.json.module.http.server.JsonWriterHttpServerResponseMapper; +import ru.tinkoff.kora.json.module.kafka.JsonKafkaDeserializer; +import ru.tinkoff.kora.json.module.kafka.JsonKafkaSerializer; public interface JsonModule extends JsonCommonModule { @Json @@ -41,4 +43,14 @@ default JsonStringParameterConverter jsonStringParameterConverter(JsonWri default JsonStringParameterReader jsonStringParameterReader(JsonReader reader) { return new JsonStringParameterReader<>(reader); } + + @Json + default JsonKafkaDeserializer jsonKafkaDeserializer(JsonReader reader) { + return new JsonKafkaDeserializer<>(reader); + } + + @Json + default JsonKafkaSerializer jsonKafkaSerializer(JsonWriter writer) { + return new JsonKafkaSerializer<>(writer); + } } diff --git a/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaDeserializer.java b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaDeserializer.java new file mode 100644 index 000000000..048f391f7 --- /dev/null +++ b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaDeserializer.java @@ -0,0 +1,24 @@ +package ru.tinkoff.kora.json.module.kafka; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import ru.tinkoff.kora.json.common.JsonReader; + +import java.io.IOException; + +public final class JsonKafkaDeserializer implements Deserializer { + private final JsonReader reader; + + public JsonKafkaDeserializer(JsonReader reader) { + this.reader = reader; + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + return this.reader.read(data); + } catch (IOException e) { + throw new SerializationException("Unable to deserialize from json", e); + } + } +} diff --git a/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaSerializer.java b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaSerializer.java new file mode 100644 index 000000000..00e82908d --- /dev/null +++ b/json/json-module/src/main/java/ru/tinkoff/kora/json/module/kafka/JsonKafkaSerializer.java @@ -0,0 +1,24 @@ +package ru.tinkoff.kora.json.module.kafka; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; +import ru.tinkoff.kora.json.common.JsonWriter; + +import java.io.IOException; + +public final class JsonKafkaSerializer implements Serializer { + private final JsonWriter writer; + + public JsonKafkaSerializer(JsonWriter writer) { + this.writer = writer; + } + + @Override + public byte[] serialize(String topic, T data) { + try { + return this.writer.toByteArray(data); + } catch (IOException e) { + throw new SerializationException("Unable to serialize into json", e); + } + } +}