Skip to content

Commit 8c218d9

Browse files
authored
feat: Implement Fs2KafkaModule (#246)
* feat: Implement Fs2KafkaModule * fix: Scala 2.12 compatibility * fix: Warnings * fix: Make custom ChainingOps private * docs: Add fs2-kafka documentation * refactor: "Escape hatch" from the typesafe configuration * docs: Add info about untyped properties
1 parent fc2b77e commit 8c218d9

File tree

12 files changed

+370
-0
lines changed

12 files changed

+370
-0
lines changed

build.sbt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ lazy val root = project
1111
example,
1212
flyway,
1313
flywayPureConfig,
14+
fs2Kafka,
15+
fs2KafkaPureConfig,
1416
grpcServer,
1517
grpcServerMicrometer,
1618
grpcServerPureConfig,
@@ -167,6 +169,27 @@ lazy val flywayPureConfig = project
167169
libraryDependencies += Dependencies.pureConfig
168170
)
169171

172+
lazy val fs2Kafka = project
173+
.in(file("fs2-kafka"))
174+
.settings(BuildSettings.common)
175+
.settings(
176+
name := "sst-fs2-kafka",
177+
libraryDependencies ++= Seq(
178+
Dependencies.fs2Kafka,
179+
Dependencies.testContainersScalaScalaTest % Test,
180+
Dependencies.testContainersScalaKafka % Test
181+
)
182+
)
183+
184+
lazy val fs2KafkaPureConfig = project
185+
.in(file("fs2-kafka-pureconfig"))
186+
.dependsOn(fs2Kafka)
187+
.settings(BuildSettings.common)
188+
.settings(
189+
name := "sst-fs2-kafka-pureconfig",
190+
libraryDependencies += Dependencies.pureConfig
191+
)
192+
170193
lazy val grpcServer = project
171194
.in(file("grpc-server"))
172195
.settings(BuildSettings.common)
@@ -423,6 +446,7 @@ lazy val site = project
423446
example,
424447
flyway,
425448
flywayPureConfig,
449+
fs2Kafka,
426450
http4sClientBlazePureConfig,
427451
http4sClientMonixCatnap,
428452
monixCatnapPureConfig,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.avast.sst.fs2kafka.pureconfig
2+
3+
import cats.syntax.either._
4+
import com.avast.sst.fs2kafka.{ConsumerConfig, ProducerConfig}
5+
import fs2.kafka.{Acks, AutoOffsetReset, CommitRecovery, IsolationLevel}
6+
import pureconfig.ConfigReader
7+
import pureconfig.error.CannotConvert
8+
import pureconfig.generic.ProductHint
9+
import pureconfig.generic.semiauto.deriveReader
10+
11+
trait ConfigReaders {
12+
13+
implicit protected def hint[T]: ProductHint[T] = ProductHint.default
14+
15+
implicit val fs2KafkaCommitRecoveryConfigReader: ConfigReader[CommitRecovery] = ConfigReader[String].emap {
16+
case s if s.toLowerCase() == "default" => CommitRecovery.Default.asRight
17+
case s if s.toLowerCase() == "none" => CommitRecovery.None.asRight
18+
case value => CannotConvert(value, "CommitRecovery", "default|none").asLeft
19+
}
20+
21+
implicit val fs2KafkaAutoOffsetResetConfigReader: ConfigReader[AutoOffsetReset] = ConfigReader[String].emap {
22+
case s if s.toLowerCase() == "earliest" => AutoOffsetReset.Earliest.asRight
23+
case s if s.toLowerCase() == "latest" => AutoOffsetReset.Latest.asRight
24+
case s if s.toLowerCase() == "none" => AutoOffsetReset.None.asRight
25+
case value => CannotConvert(value, "AutoOffsetReset", "earliest|latest|none").asLeft
26+
}
27+
28+
implicit val fs2KafkaIsolationLevelConfigReader: ConfigReader[IsolationLevel] = ConfigReader[String].emap {
29+
case s if s.toLowerCase() == "read_committed" => IsolationLevel.ReadCommitted.asRight
30+
case s if s.toLowerCase() == "read_uncommitted" => IsolationLevel.ReadUncommitted.asRight
31+
case value => CannotConvert(value, "IsolationLevel", "read_committed|read_uncommitted").asLeft
32+
}
33+
34+
implicit val fs2KafkaAcksConfigReader: ConfigReader[Acks] = ConfigReader[String].emap {
35+
case s if s.toLowerCase() == "0" => Acks.Zero.asRight
36+
case s if s.toLowerCase() == "1" => Acks.One.asRight
37+
case s if s.toLowerCase() == "all" => Acks.All.asRight
38+
case value => CannotConvert(value, "Acks", "0|1|all").asLeft
39+
}
40+
41+
implicit val fs2KafkaConsumerConfigReader: ConfigReader[ConsumerConfig] = deriveReader
42+
43+
implicit val fs2KafkaProducerConfigReader: ConfigReader[ProducerConfig] = deriveReader
44+
45+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.avast.sst.fs2kafka.pureconfig
2+
3+
import pureconfig.ConfigFieldMapping
4+
import pureconfig.generic.ProductHint
5+
6+
/** Contains [[pureconfig.ConfigReader]] instances with default "kebab-case" naming convention. */
7+
object implicits extends ConfigReaders {
8+
9+
/** Contains [[pureconfig.ConfigReader]] instances with "kebab-case" naming convention.
10+
*
11+
* This is alias for the default `implicits._` import.
12+
*/
13+
object KebabCase extends ConfigReaders
14+
15+
/** Contains [[pureconfig.ConfigReader]] instances with "camelCase" naming convention. */
16+
object CamelCase extends ConfigReaders {
17+
implicit override protected def hint[T]: ProductHint[T] = ProductHint(ConfigFieldMapping(pureconfig.CamelCase, pureconfig.CamelCase))
18+
}
19+
20+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.avast.sst.fs2kafka
2+
3+
import java.util.concurrent.TimeUnit.{MILLISECONDS, SECONDS}
4+
5+
import com.avast.sst.fs2kafka.ConsumerConfig._
6+
import com.github.ghik.silencer.silent
7+
import fs2.kafka.{AutoOffsetReset, CommitRecovery, IsolationLevel}
8+
import org.apache.kafka.clients.consumer.{ConsumerConfig => ApacheConsumerConfig}
9+
10+
import scala.concurrent.duration.FiniteDuration
11+
import scala.jdk.CollectionConverters._
12+
13+
@silent("dead code")
14+
final case class ConsumerConfig(
15+
bootstrapServers: List[String],
16+
groupId: String,
17+
groupInstanceId: Option[String] = None,
18+
clientId: Option[String] = None,
19+
clientRack: Option[String] = None,
20+
autoOffsetReset: AutoOffsetReset = AutoOffsetReset.None,
21+
enableAutoCommit: Boolean = false,
22+
autoCommitInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
23+
allowAutoCreateTopics: Boolean = default(ApacheConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
24+
closeTimeout: FiniteDuration = FiniteDuration(20, SECONDS),
25+
commitRecovery: CommitRecovery = CommitRecovery.Default,
26+
commitTimeout: FiniteDuration = FiniteDuration(15, SECONDS),
27+
defaultApiTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG),
28+
heartbeatInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
29+
isolationLevel: IsolationLevel = defaultIsolationLevel,
30+
maxPrefetchBatches: Int = 2,
31+
pollInterval: FiniteDuration = FiniteDuration(50, MILLISECONDS),
32+
pollTimeout: FiniteDuration = FiniteDuration(50, MILLISECONDS),
33+
maxPollInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
34+
maxPollRecords: Int = default(ApacheConsumerConfig.MAX_POLL_RECORDS_CONFIG),
35+
requestTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
36+
sessionTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
37+
properties: Map[String, String] = Map.empty
38+
)
39+
40+
object ConsumerConfig {
41+
42+
private val officialDefaults = ApacheConsumerConfig.configDef().defaultValues().asScala
43+
44+
private def default[A](key: String): A = officialDefaults(key).asInstanceOf[A]
45+
46+
private def defaultMillis(key: String): FiniteDuration = FiniteDuration(default[Int](key).toLong, MILLISECONDS)
47+
48+
private val defaultIsolationLevel = default[String](ApacheConsumerConfig.ISOLATION_LEVEL_CONFIG) match {
49+
case "read_uncommitted" => IsolationLevel.ReadUncommitted
50+
case "read_committed" => IsolationLevel.ReadCommitted
51+
}
52+
53+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.avast.sst.fs2kafka
2+
3+
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Timer}
4+
import fs2.kafka._
5+
6+
object Fs2KafkaModule {
7+
8+
def makeConsumer[F[_]: ConcurrentEffect: ContextShift: Timer, K: Deserializer[F, *], V: Deserializer[F, *]](
9+
config: ConsumerConfig,
10+
blocker: Option[Blocker] = None
11+
): Resource[F, KafkaConsumer[F, K, V]] = {
12+
def setOpt[A](maybeValue: Option[A])(
13+
setter: (ConsumerSettings[F, K, V], A) => ConsumerSettings[F, K, V]
14+
)(initial: ConsumerSettings[F, K, V]): ConsumerSettings[F, K, V] =
15+
maybeValue match {
16+
case Some(value) => setter(initial, value)
17+
case None => initial
18+
}
19+
20+
val settings = ConsumerSettings(implicitly[Deserializer[F, K]], implicitly[Deserializer[F, V]])
21+
.withBootstrapServers(config.bootstrapServers.mkString(","))
22+
.withGroupId(config.groupId)
23+
.pipe(setOpt(config.groupInstanceId)(_.withGroupInstanceId(_)))
24+
.pipe(setOpt(config.clientId)(_.withClientId(_)))
25+
.pipe(setOpt(config.clientRack)(_.withClientRack(_)))
26+
.withAutoOffsetReset(config.autoOffsetReset)
27+
.withEnableAutoCommit(config.enableAutoCommit)
28+
.withAutoCommitInterval(config.autoCommitInterval)
29+
.withAllowAutoCreateTopics(config.allowAutoCreateTopics)
30+
.withCloseTimeout(config.closeTimeout)
31+
.withCommitRecovery(config.commitRecovery)
32+
.withCommitTimeout(config.closeTimeout)
33+
.withDefaultApiTimeout(config.defaultApiTimeout)
34+
.withHeartbeatInterval(config.heartbeatInterval)
35+
.withIsolationLevel(config.isolationLevel)
36+
.withMaxPrefetchBatches(config.maxPrefetchBatches)
37+
.withPollInterval(config.pollInterval)
38+
.withPollTimeout(config.pollTimeout)
39+
.withMaxPollInterval(config.maxPollInterval)
40+
.withMaxPollRecords(config.maxPollRecords)
41+
.withRequestTimeout(config.requestTimeout)
42+
.withSessionTimeout(config.sessionTimeout)
43+
.pipe(setOpt(blocker)(_.withBlocker(_)))
44+
.withProperties(config.properties)
45+
46+
makeConsumer(settings)
47+
}
48+
49+
def makeConsumer[F[_]: ConcurrentEffect: ContextShift: Timer, K, V](
50+
settings: ConsumerSettings[F, K, V]
51+
): Resource[F, KafkaConsumer[F, K, V]] = consumerResource[F].using(settings)
52+
53+
def makeProducer[F[_]: ConcurrentEffect: ContextShift, K: Serializer[F, *], V: Serializer[F, *]](
54+
config: ProducerConfig,
55+
blocker: Option[Blocker] = None
56+
): Resource[F, KafkaProducer[F, K, V]] = {
57+
def setOpt[A](maybeValue: Option[A])(
58+
setter: (ProducerSettings[F, K, V], A) => ProducerSettings[F, K, V]
59+
)(initial: ProducerSettings[F, K, V]): ProducerSettings[F, K, V] =
60+
maybeValue match {
61+
case Some(value) => setter(initial, value)
62+
case None => initial
63+
}
64+
65+
val settings = ProducerSettings(implicitly[Serializer[F, K]], implicitly[Serializer[F, V]])
66+
.withBootstrapServers(config.bootstrapServers.mkString(","))
67+
.pipe(setOpt(config.clientId)(_.withClientId(_)))
68+
.withAcks(config.acks)
69+
.withBatchSize(config.batchSize)
70+
.withCloseTimeout(config.closeTimeout)
71+
.withDeliveryTimeout(config.deliveryTimeout)
72+
.withRequestTimeout(config.requestTimeout)
73+
.withLinger(config.linger)
74+
.withEnableIdempotence(config.enableIdempotence)
75+
.withMaxInFlightRequestsPerConnection(config.maxInFlightRequestsPerConnection)
76+
.withParallelism(config.parallelism)
77+
.withRetries(config.retries)
78+
.pipe(setOpt(blocker)(_.withBlocker(_)))
79+
.withProperties(config.properties)
80+
81+
makeProducer(settings)
82+
}
83+
84+
def makeProducer[F[_]: ConcurrentEffect: ContextShift, K, V](settings: ProducerSettings[F, K, V]): Resource[F, KafkaProducer[F, K, V]] =
85+
producerResource[F].using(settings)
86+
87+
/** Copy of the same class from Scala 2.13 */
88+
implicit private final class ChainingOps[A](private val self: A) extends AnyVal {
89+
def pipe[B](f: A => B): B = f(self)
90+
}
91+
92+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.avast.sst.fs2kafka
2+
3+
import java.util.concurrent.TimeUnit.{MILLISECONDS, SECONDS}
4+
5+
import com.avast.sst.fs2kafka.ProducerConfig._
6+
import fs2.kafka.Acks
7+
import org.apache.kafka.clients.producer.{ProducerConfig => ApacheProducerConfig}
8+
9+
import scala.concurrent.duration.FiniteDuration
10+
import scala.jdk.CollectionConverters._
11+
12+
final case class ProducerConfig(
13+
bootstrapServers: List[String],
14+
clientId: Option[String] = None,
15+
acks: Acks = defaultAcks,
16+
batchSize: Int = default[Int](ApacheProducerConfig.BATCH_SIZE_CONFIG),
17+
closeTimeout: FiniteDuration = FiniteDuration(60, SECONDS),
18+
deliveryTimeout: FiniteDuration = defaultMillis(ApacheProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
19+
requestTimeout: FiniteDuration = defaultMillis(ApacheProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
20+
linger: FiniteDuration = defaultMillisLong(ApacheProducerConfig.LINGER_MS_CONFIG),
21+
enableIdempotence: Boolean = default[Boolean](ApacheProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
22+
maxInFlightRequestsPerConnection: Int = default[Int](ApacheProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
23+
parallelism: Int = 100,
24+
retries: Int = 0,
25+
properties: Map[String, String] = Map.empty
26+
)
27+
28+
object ProducerConfig {
29+
30+
private val officialDefaults = ApacheProducerConfig.configDef().defaultValues().asScala
31+
32+
private def default[A](key: String): A = officialDefaults(key).asInstanceOf[A]
33+
34+
private def defaultMillis(key: String): FiniteDuration = FiniteDuration(default[Int](key).toLong, MILLISECONDS)
35+
private def defaultMillisLong(key: String): FiniteDuration = FiniteDuration(default[Long](key), MILLISECONDS)
36+
37+
private val defaultAcks = default[String](ApacheProducerConfig.ACKS_CONFIG) match {
38+
case "all" => Acks.All
39+
case "0" => Acks.Zero
40+
case "1" => Acks.One
41+
}
42+
43+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.avast.sst.fs2kafka
2+
3+
import cats.effect.{IO, Resource}
4+
import cats.syntax.flatMap._
5+
import com.dimafeng.testcontainers.{ForAllTestContainer, KafkaContainer}
6+
import fs2.kafka.{AutoOffsetReset, ProducerRecord, ProducerRecords}
7+
import org.scalatest.funsuite.AsyncFunSuite
8+
9+
import scala.concurrent.ExecutionContext.Implicits.global
10+
11+
class Fs2KafkaModuleTest extends AsyncFunSuite with ForAllTestContainer {
12+
13+
override val container = KafkaContainer()
14+
15+
implicit private val cs = IO.contextShift(global)
16+
implicit private val timer = IO.timer(global)
17+
18+
test("producer") {
19+
val io = for {
20+
producer <- Fs2KafkaModule.makeProducer[IO, String, String](ProducerConfig(List(container.bootstrapServers)))
21+
consumer <- Fs2KafkaModule.makeConsumer[IO, String, String](
22+
ConsumerConfig(List(container.bootstrapServers), groupId = "test", autoOffsetReset = AutoOffsetReset.Earliest)
23+
)
24+
_ <- Resource.liftF(consumer.subscribeTo("test"))
25+
_ <- Resource.liftF(producer.produce(ProducerRecords.one(ProducerRecord("test", "key", "value"))).flatten)
26+
event <- Resource.liftF(consumer.stream.head.compile.toList)
27+
} yield assert(event.head.record.key === "key" && event.head.record.value === "value")
28+
29+
io.use(IO.pure).unsafeToFuture
30+
}
31+
32+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.avast.sst.fs2kafka
2+
3+
import org.scalatest.funsuite.AnyFunSuite
4+
5+
class KafkaConfigTest extends AnyFunSuite {
6+
7+
test("verify ConsumerConfig defaults") {
8+
ConsumerConfig(List.empty, "group.id")
9+
succeed
10+
}
11+
12+
test("verify ProducerConfig defaults") {
13+
ProducerConfig(List.empty)
14+
succeed
15+
}
16+
17+
}

grpc-server/src/main/scala/com/avast/sst/grpc/server/GrpcServerModule.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.grpc.{Server, ServerBuilder, ServerInterceptor, ServerServiceDefinitio
77

88
import scala.collection.immutable.Seq
99
import scala.concurrent.ExecutionContext
10+
1011
object GrpcServerModule {
1112

1213
/** Makes [[io.grpc.Server]] (Netty) initialized with the given config, services and interceptors.

project/Dependencies.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ object Dependencies {
77
val doobie = "org.tpolecat" %% "doobie-core" % Versions.doobie
88
val doobieHikari = "org.tpolecat" %% "doobie-hikari" % Versions.doobie
99
val flywayCore = "org.flywaydb" % "flyway-core" % "6.4.2"
10+
val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % "1.0.0"
1011
val grpcNettyShaded = "io.grpc" % "grpc-netty-shaded" % Versions.grpc
1112
val grpcProtobuf = "io.grpc" % "grpc-protobuf" % Versions.grpc
1213
val grpcStub = "io.grpc" % "grpc-stub" % Versions.grpc
@@ -34,6 +35,8 @@ object Dependencies {
3435
val silencerLib = "com.github.ghik" % "silencer-lib" % Versions.silencer cross CrossVersion.full
3536
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.30"
3637
val sslConfig = "com.typesafe" %% "ssl-config-core" % "0.4.2"
38+
val testContainersScalaScalaTest = "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.37.0"
39+
val testContainersScalaKafka = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.37.0"
3740
val zio = "dev.zio" %% "zio" % "1.0.0-RC19-2"
3841
val zioInteropCats = "dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC14"
3942

0 commit comments

Comments
 (0)