Skip to content

Commit 9aa33e8

Browse files
authoredMar 13, 2023
Add zio-json (#464)
1 parent 8fafc5a commit 9aa33e8

File tree

30 files changed

+199
-37
lines changed

30 files changed

+199
-37
lines changed
 

‎README.md

+13-4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Following target libraries are supported:
1414
- [play-json](https://github.com/playframework/play-json)
1515
- [upickle](https://github.com/lihaoyi/upickle)
1616
- [scalapb](https://github.com/scalapb/ScalaPB)
17+
- [zio-json](https://github.com/zio/zio-json)
1718

1819
Inspired by [https://github.com/hseeberger/akka-http-json](https://github.com/hseeberger/akka-http-json).
1920

@@ -26,28 +27,28 @@ Add dependencies for the selected integration:
2627
- for avro4s:
2728
``` scala
2829
libraryDependencies ++= List(
29-
"io.github.azhur" %% "kafka-serde-avro4s" % version,
30+
"io.github.azhur" %% "kafka-serde-avro4s" % version
3031
)
3132
```
3233

3334
- for circe:
3435
``` scala
3536
libraryDependencies ++= List(
36-
"io.github.azhur" %% "kafka-serde-circe" % version,
37+
"io.github.azhur" %% "kafka-serde-circe" % version
3738
)
3839
```
3940

4041
- for jackson:
4142
``` scala
4243
libraryDependencies ++= List(
43-
"io.github.azhur" %% "kafka-serde-jackson" % version,
44+
"io.github.azhur" %% "kafka-serde-jackson" % version
4445
)
4546
```
4647

4748
- for json4s:
4849
``` scala
4950
libraryDependencies ++= List(
50-
"io.github.azhur" %% "kafka-serde-json4s" % version,
51+
"io.github.azhur" %% "kafka-serde-json4s" % version
5152
)
5253
```
5354

@@ -80,6 +81,13 @@ libraryDependencies ++= List(
8081
)
8182
```
8283

84+
- for zio-json:
85+
``` scala
86+
libraryDependencies ++= List(
87+
"io.github.azhur" %% "kafka-serde-zio-json" % version
88+
)
89+
```
90+
8391
## Usage
8492

8593
Mix `xxxSupport` into your code which requires implicit Kafka
@@ -96,6 +104,7 @@ Provide your implicit type class instances and the magic will convert them to Ka
96104
- for play-json: `play.api.libs.json.Reads`, `play.api.libs.json.Writes`
97105
- for upickle: `upickle.default.Reader`, `upickle.default.Writer`
98106
- for scalapb: `scalapb.GeneratedMessageCompanion`
107+
- for zio-json: `zio.json.JsonEncoder`, `zio.json.JsonDecoder`
99108

100109
For more info, please, take a look at unit tests and at `kafka-serde-scala-example` which is a kafka-streams (2.x) application with kafka-serde-scala usage.
101110

‎build.sbt

+20-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ lazy val latest212 = "2.12.17"
2626

2727
lazy val latest213 = "2.13.10"
2828

29-
lazy val latest3 = "3.2.1"
29+
lazy val latest3 = "3.2.2"
3030

3131
lazy val `kafka-serde-scala` =
3232
project
@@ -41,6 +41,7 @@ lazy val `kafka-serde-scala` =
4141
`kafka-serde-play-json`,
4242
`kafka-serde-upickle`,
4343
`kafka-serde-scalapb`,
44+
`kafka-serde-zio-json`,
4445
`kafka-serde-scala-example`
4546
)
4647
.settings(commonSettings)
@@ -167,6 +168,22 @@ lazy val `kafka-serde-scalapb` = project
167168
.settings(
168169
Compile / PB.targets := Seq(
169170
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
171+
),
172+
Test / PB.targets := Seq(
173+
scalapb.gen() -> (Test / sourceManaged).value / "scalapb"
174+
)
175+
)
176+
177+
lazy val `kafka-serde-zio-json` = project
178+
.enablePlugins(AutomateHeaderPlugin)
179+
.settings(commonSettings)
180+
.settings(mimaSettings)
181+
.settings(
182+
crossScalaVersions := Seq(latest212, latest213, latest3),
183+
libraryDependencies ++= Seq(
184+
dependency.kafkaClients,
185+
dependency.zioJson,
186+
dependency.scalaTest % Test
170187
)
171188
)
172189

@@ -196,6 +213,7 @@ lazy val dependency =
196213
val play = "2.9.4"
197214
val upickle = "3.0.0"
198215
val jackson = "2.14.2"
216+
val zioJson = "0.4.2"
199217
}
200218
val kafkaClients = "org.apache.kafka" % "kafka-clients" % Version.kafka
201219
val kafkaStreamsScala = "org.apache.kafka" %% "kafka-streams-scala" % Version.kafka
@@ -214,6 +232,7 @@ lazy val dependency =
214232
val upickle = "com.lihaoyi" %% "upickle" % Version.upickle
215233
val jacksonScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Version.jackson
216234
val jacksonCore = "com.fasterxml.jackson.core" % "jackson-core" % Version.jackson
235+
val zioJson = "dev.zio" %% "zio-json" % Version.zioJson
217236
val jacksonProtobuf = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-protobuf" % Version.jackson
218237
val jacksonAvro = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-avro" % Version.jackson
219238
val scalaTest = "org.scalatest" %% "scalatest" % Version.scalaTest

‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafkaserdeavro4s/Avro4sBinarySupport.scala ‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafka/serde/Avro4sBinarySupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.io.ByteArrayOutputStream
2020
import java.util

‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafkaserdeavro4s/Avro4sDataSupport.scala ‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafka/serde/Avro4sDataSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.io.ByteArrayOutputStream
2020
import java.util

‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafkaserdeavro4s/Avro4sJsonSupport.scala ‎kafka-serde-avro4s/src/main/scala/io/github/azhur/kafka/serde/Avro4sJsonSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.io.ByteArrayOutputStream
2020
import java.util

‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafkaserdeavro4s/Avro4sBinarySupportSpec.scala ‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafka/serde/Avro4sBinarySupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import com.sksamuel.avro4s.{ Decoder, Encoder, SchemaFor }
2020
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }

‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafkaserdeavro4s/Avro4sDataSupportSpec.scala ‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafka/serde/Avro4sDataSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import com.sksamuel.avro4s.{ Decoder, Encoder, SchemaFor }
2020
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }

‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafkaserdeavro4s/Avro4sJsonSupportSpec.scala ‎kafka-serde-avro4s/src/test/scala/io/github/azhur/kafka/serde/Avro4sJsonSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeavro4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

‎kafka-serde-circe/src/main/scala/io/github/azhur/kafkaserdecirce/CirceSupport.scala ‎kafka-serde-circe/src/main/scala/io/github/azhur/kafka/serde/CirceSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdecirce
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020
import java.util

‎kafka-serde-circe/src/test/scala/io/github/azhur/kafkaserdecirce/CirceSupportSpec.scala ‎kafka-serde-circe/src/test/scala/io/github/azhur/kafka/serde/CirceSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdecirce
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafkaserdejackson/Jackson.scala ‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafka/serde/Jackson.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import com.fasterxml.jackson.core.`type`.TypeReference
2020
import scala.reflect.runtime.universe._

‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafkaserdejackson/JacksonFormatSchemaSupport.scala ‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafka/serde/JacksonFormatSchemaSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import java.util
2020

‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafkaserdejackson/JacksonJsonSupport.scala ‎kafka-serde-jackson/src/main/scala/io/github/azhur/kafka/serde/JacksonJsonSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import java.util
2020

‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafkaserdejackson/ApiSpec.scala ‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafka/serde/ApiSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }
2020

‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafkaserdejackson/JacksonFormatSchemaSupportSpec.scala ‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafka/serde/JacksonFormatSchemaSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import com.fasterxml.jackson.dataformat.avro.{ AvroMapper, AvroSchema }
2020
import com.fasterxml.jackson.dataformat.protobuf.ProtobufMapper

‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafkaserdejackson/JacksonJsonSupportSpec.scala ‎kafka-serde-jackson/src/test/scala/io/github/azhur/kafka/serde/JacksonJsonSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejackson
17+
package io.github.azhur.kafka.serde
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper
2020
import com.fasterxml.jackson.module.scala.DefaultScalaModule

‎kafka-serde-json4s/src/main/scala/io/github/azhur/kafkaserdejson4s/Json4sSupport.scala ‎kafka-serde-json4s/src/main/scala/io/github/azhur/kafka/serde/Json4sSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejson4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020
import java.util

‎kafka-serde-json4s/src/test/scala/io/github/azhur/kafkaserdejson4s/Json4sSupportSpec.scala ‎kafka-serde-json4s/src/test/scala/io/github/azhur/kafka/serde/Json4sSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejson4s
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

‎kafka-serde-jsoniter-scala/src/main/scala/io/github/azhur/kafkaserdejsoniterscala/JsoniterScalaSupport.scala ‎kafka-serde-jsoniter-scala/src/main/scala/io/github/azhur/kafka/serde/JsoniterScalaSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejsoniterscala
17+
package io.github.azhur.kafka.serde
1818

1919
import java.util
2020

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdejsoniterscala
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

@@ -42,7 +42,7 @@ object JsoniterScalaSupportSpec {
4242

4343
class JsoniterScalaSupportSpec extends AnyFreeSpec with Matchers {
4444
import JsoniterScalaSupportSpec._
45-
import io.github.azhur.kafkaserdejsoniterscala.JsoniterScalaSupport._
45+
import JsoniterScalaSupport._
4646

4747
implicit private val fooCodec: JsonValueCodec[Foo] = JsonCodecMaker.make[Foo]
4848

‎kafka-serde-play-json/src/main/scala/io/github/azhur/kafkaserdeplayjson/PlayJsonSupport.scala ‎kafka-serde-play-json/src/main/scala/io/github/azhur/kafka/serde/PlayJsonSupport.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeplayjson
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020
import java.util
2121

22-
import io.github.azhur.kafkaserdeplayjson.PlayJsonSupport.PlayJsonError
22+
import PlayJsonSupport.PlayJsonError
2323
import org.apache.kafka.common.errors.SerializationException
2424
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }
2525
import play.api.libs.json.{ JsError, JsValue, Json, Reads, Writes }

‎kafka-serde-play-json/src/test/scala/io/github/azhur/kafkaserdeplayjson/PlayJsonSupportSpec.scala ‎kafka-serde-play-json/src/test/scala/io/github/azhur/kafka/serde/PlayJsonSupportSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeplayjson
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

@@ -43,7 +43,7 @@ object PlayJsonSupportSpec {
4343

4444
class PlayJsonSupportSpec extends AnyFreeSpec with Matchers {
4545
import PlayJsonSupportSpec._
46-
import io.github.azhur.kafkaserdeplayjson.PlayJsonSupport._
46+
import PlayJsonSupport._
4747

4848
"PlayJsonSupport" - {
4949
"should implicitly convert to kafka Serializer" in {

‎kafka-serde-scala-example/src/main/scala/io/github/azhur/kafkaserdescala/example/Application.scala ‎kafka-serde-scala-example/src/main/scala/io/github/azhur/kafka/serde/Application.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdescala.example
18-
import java.util.Properties
17+
package io.github.azhur.kafka.serde
1918

20-
import io.github.azhur.kafkaserdecirce.CirceSupport
19+
import io.github.azhur.kafka.serde.CirceSupport
20+
import java.util.Properties
2121
import org.apache.kafka.clients.consumer.ConsumerConfig
2222
import org.apache.kafka.streams.{ KafkaStreams, StreamsConfig, Topology }
2323
import org.apache.kafka.streams.scala.StreamsBuilder

‎kafka-serde-scalapb/src/main/scala/io/github/azhur/kafkaserdescalapb/ScalaPbSupport.scala ‎kafka-serde-scalapb/src/main/scala/io/github/azhur/kafka/serde/ScalaPbSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdescalapb
17+
package io.github.azhur.kafka.serde
1818

1919
import org.apache.kafka.common.serialization.Serializer
2020
import scalapb.GeneratedMessageCompanion

‎kafka-serde-scalapb/src/main/protobuf/SearchRequest.proto ‎kafka-serde-scalapb/src/test/protobuf/SearchRequest.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
syntax = "proto2";
22

3-
package io.github.azhur.scalaserdescalapb.proto;
3+
package io.github.azhur.serde.proto;
44

55
message SearchRequest {
66
required string query = 1;

‎kafka-serde-scalapb/src/test/scala/io/github/azhur/kafkaserdescalapb/ScalaPbSupportSpec.scala ‎kafka-serde-scalapb/src/test/scala/io/github/azhur/kafka/serde/ScalaPbSupportSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdescalapb
17+
package io.github.azhur.kafka.serde
1818

1919
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }
2020
import org.scalatest.freespec.AnyFreeSpec
2121
import org.scalatest.matchers.should.Matchers
22-
import io.github.azhur.scalaserdescalapb.proto.SearchRequest.SearchRequest
22+
import io.github.azhur.serde.proto.SearchRequest.SearchRequest
2323
import scalapb.GeneratedMessageCompanion
2424

2525
object ScalaPbSupportSpec {

‎kafka-serde-upickle/src/main/scala/io/github/azhur/kafkaserdeupickle/UpickleSupport.scala ‎kafka-serde-upickle/src/main/scala/io/github/azhur/kafka/serde/UpickleSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeupickle
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020
import java.util

‎kafka-serde-upickle/src/test/scala/io/github/azhur/kafkaserdeupickle/UpickleSupportSpec.scala ‎kafka-serde-upickle/src/test/scala/io/github/azhur/kafka/serde/UpickleSupportSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.github.azhur.kafkaserdeupickle
17+
package io.github.azhur.kafka.serde
1818

1919
import java.nio.charset.StandardCharsets.UTF_8
2020

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2023 Artur Zhurat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.azhur.kafka.serde
18+
19+
import java.nio.charset.StandardCharsets.UTF_8
20+
import java.util
21+
22+
import org.apache.kafka.common.errors.SerializationException
23+
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }
24+
import zio.json._
25+
26+
import scala.util.control.NonFatal
27+
28+
trait ZioJsonSupport {
29+
implicit def toSerializer[T >: Null](implicit encoder: JsonEncoder[T]): Serializer[T] =
30+
new Serializer[T] {
31+
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
32+
override def close(): Unit = {}
33+
override def serialize(topic: String, data: T): Array[Byte] =
34+
if (data == null) null
35+
else
36+
try data.toJson.getBytes(UTF_8)
37+
catch {
38+
case NonFatal(e) => throw new SerializationException(e)
39+
}
40+
}
41+
42+
implicit def toDeserializer[T >: Null](implicit decoder: JsonDecoder[T]): Deserializer[T] =
43+
new Deserializer[T] {
44+
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
45+
override def close(): Unit = {}
46+
override def deserialize(topic: String, data: Array[Byte]): T =
47+
if (data == null) null
48+
else
49+
decoder.decodeJson(new String(data, UTF_8)) match {
50+
case Left(value) => throw new SerializationException(value)
51+
case Right(value) => value
52+
}
53+
}
54+
55+
implicit def toSerde[T >: Null](implicit encoder: JsonEncoder[T], decoder: JsonDecoder[T]): Serde[T] =
56+
new Serde[T] {
57+
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
58+
override def close(): Unit = {}
59+
override def serializer(): Serializer[T] = toSerializer[T]
60+
override def deserializer(): Deserializer[T] = toDeserializer[T]
61+
}
62+
}
63+
64+
object ZioJsonSupport extends ZioJsonSupport
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2023 Artur Zhurat
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.azhur.kafka.serde
18+
19+
import java.nio.charset.StandardCharsets.UTF_8
20+
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serializer }
21+
import org.scalatest.freespec.AnyFreeSpec
22+
import org.scalatest.matchers.should.Matchers
23+
import zio.json._
24+
25+
object ZioJsonSupportSpec {
26+
case class Foo(a: Int, b: String)
27+
28+
def serializeFoo(foo: Foo)(implicit serializer: Serializer[Foo]): Array[Byte] =
29+
serializer.serialize("unused_topic", foo)
30+
31+
def deserializeFoo(bytes: Array[Byte])(implicit deserializer: Deserializer[Foo]): Foo =
32+
deserializer.deserialize("unused_topic", bytes)
33+
34+
def serdeFooDes(bytes: Array[Byte])(implicit serde: Serde[Foo]): Foo =
35+
serde.deserializer().deserialize("unused_topic", bytes)
36+
37+
def serdeFooSer(foo: Foo)(implicit serde: Serde[Foo]): Array[Byte] =
38+
serde.serializer().serialize("unused_topic", foo)
39+
40+
implicit val decoder: JsonDecoder[Foo] = DeriveJsonDecoder.gen[Foo]
41+
implicit val encoder: JsonEncoder[Foo] = DeriveJsonEncoder.gen[Foo]
42+
}
43+
44+
class ZioJsonSupportSpec extends AnyFreeSpec with Matchers {
45+
import ZioJsonSupport._
46+
import ZioJsonSupportSpec._
47+
48+
"ZioJsonSupport" - {
49+
"should implicitly convert to kafka Serializer" in {
50+
serializeFoo(Foo(1, "𝄞")) shouldBe """{"a":1,"b":"𝄞"}""".getBytes(UTF_8)
51+
serializeFoo(null) shouldBe null
52+
}
53+
54+
"should implicitly convert to kafka Deserializer" in {
55+
deserializeFoo("""{"a":1,"b":"𝄞"}""".getBytes(UTF_8)) shouldBe Foo(1, "𝄞")
56+
deserializeFoo(null) shouldBe null
57+
}
58+
59+
"should implicitly convert to kafka Serde" in {
60+
val foo = Foo(1, "𝄞")
61+
val serializedFoo = """{"a":1,"b":"𝄞"}""".getBytes(UTF_8)
62+
63+
serdeFooDes(serializedFoo) shouldBe foo
64+
serdeFooDes(null) shouldBe null
65+
66+
serdeFooSer(foo) shouldBe serializedFoo
67+
serdeFooSer(null) shouldBe null
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)
Please sign in to comment.