kafka4s provides pure, referentially transparent functions for working with Kafka, and integrates with FP libraries such as cats-effect and fs2.
To use kafka4s in an existing SBT project with Scala 2.12 or a later version, add the following dependencies to your
build.sbt
depending on your needs:
libraryDependencies ++= Seq(
"com.banno" %% "kafka4s" % "<version>"
)
Sending records to Kafka is an effect. If we wanted to periodically write random integers to a Kafka topic, we could do:
Stream.resource(
ProducerApi
.resource[F, Int, Int](BootstrapServers(kafkaBootstrapServers))
.map(
p =>
Timer[F]
.sleep(1 second)
.flatMap(
_ =>
Sync[F]
.delay(Random.nextInt())
.flatMap(i => p.sendAndForget(new ProducerRecord(topic.name, i, i)))
)
)
)
Polling Kafka for records is also an effect, and we can obtain a stream of records from a topic. We can print the even random integers from the above topic using:
Stream.resource(
ConsumerApi
.resource[F, Int, Int](
BootstrapServers(kafkaBootstrapServers),
GroupId("example3"),
AutoOffsetReset.earliest,
EnableAutoCommit(true)
)
)
.evalTap(_.subscribe(topic.name))
.flatMap(
_.recordStream(1.second)
.map(_.value)
.filter(_ % 2 == 0)
.evalMap(i => Sync[F].delay(println(i)))
)
To learn more about kafka4s, start with our Getting Started Guide, play with some example apps, and check out the kafka4s Scaladoc for more info.