Skip to content

Commit

Permalink
Json kafka serializer & deserializer
Browse files Browse the repository at this point in the history
resolves #241
  • Loading branch information
Squiry committed Aug 7, 2023
1 parent 5ed088b commit d40ca77
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import ru.tinkoff.kora.json.module.http.server.JsonReaderHttpServerRequestMapper;
import ru.tinkoff.kora.json.module.http.server.JsonStringParameterReader;
import ru.tinkoff.kora.json.module.http.server.JsonWriterHttpServerResponseMapper;
import ru.tinkoff.kora.json.module.kafka.JsonKafkaDeserializer;
import ru.tinkoff.kora.json.module.kafka.JsonKafkaSerializer;

public interface JsonModule extends JsonCommonModule {
@Json
Expand Down Expand Up @@ -41,4 +43,14 @@ default <T> JsonStringParameterConverter<T> jsonStringParameterConverter(JsonWri
default <T> JsonStringParameterReader<T> jsonStringParameterReader(JsonReader<T> reader) {
return new JsonStringParameterReader<>(reader);
}

@Json
default <T> JsonKafkaDeserializer<T> jsonKafkaDeserializer(JsonReader<T> reader) {
return new JsonKafkaDeserializer<>(reader);
}

@Json
default <T> JsonKafkaSerializer<T> jsonKafkaSerializer(JsonWriter<T> writer) {
return new JsonKafkaSerializer<>(writer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ru.tinkoff.kora.json.module.kafka;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import ru.tinkoff.kora.json.common.JsonReader;

import java.io.IOException;

public final class JsonKafkaDeserializer<T> implements Deserializer<T> {
private final JsonReader<T> reader;

public JsonKafkaDeserializer(JsonReader<T> reader) {
this.reader = reader;
}

@Override
public T deserialize(String topic, byte[] data) {
try {
return this.reader.read(data);
} catch (IOException e) {
throw new SerializationException("Unable to deserialize from json", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ru.tinkoff.kora.json.module.kafka;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import ru.tinkoff.kora.json.common.JsonWriter;

import java.io.IOException;

public final class JsonKafkaSerializer<T> implements Serializer<T> {
private final JsonWriter<T> writer;

public JsonKafkaSerializer(JsonWriter<T> writer) {
this.writer = writer;
}

@Override
public byte[] serialize(String topic, T data) {
try {
return this.writer.toByteArray(data);
} catch (IOException e) {
throw new SerializationException("Unable to serialize into json", e);
}
}
}

0 comments on commit d40ca77

Please sign in to comment.