From 1fb27587745d6126ed1fd6f45ed2cbbd4bff49ef Mon Sep 17 00:00:00 2001 From: Piotr Limanowski Date: Tue, 26 Mar 2024 14:32:50 +0100 Subject: [PATCH] Prevent Kafka sink from blocking Previously, we used native Kafka sink implementation through kafka-clients. This caused an issue when actual `send` operation becomes blocking as the record write buffer fills and an actual send operation is performed. We're currently successfully using fs2-kafka in enrich with Enrich without major issues. Rewriting the Sink to use a two-effect pattern makes no sense and therefore fs2-kafka is used to prevent blocking problems. --- flake.lock | 434 ++++++++++++++++-- .../sinks/KafkaSink.scala | 75 ++- project/BuildSettings.scala | 2 +- project/Dependencies.scala | 4 +- 4 files changed, 421 insertions(+), 94 deletions(-) diff --git a/flake.lock b/flake.lock index 3eba6bb9f..a192a8109 100644 --- a/flake.lock +++ b/flake.lock @@ -1,24 +1,80 @@ { "nodes": { - "devenv": { + "cachix": { "inputs": { - "flake-compat": "flake-compat", - "nix": "nix", + "devenv": "devenv_2", + "flake-compat": "flake-compat_2", "nixpkgs": [ + "devenv", "nixpkgs" ], "pre-commit-hooks": "pre-commit-hooks" }, "locked": { - "lastModified": 1697058441, - "narHash": "sha256-gjtW+nkM9suMsjyid63HPmt6WZQEvuVqA5cOAf4lLM0=", + "lastModified": 1710475558, + "narHash": "sha256-egKrPCKjy/cE+NqCj4hg2fNX/NwLCf0bRDInraYXDgs=", + "owner": "cachix", + "repo": "cachix", + "rev": "661bbb7f8b55722a0406456b15267b5426a3bda6", + "type": "github" + }, + "original": { + "owner": "cachix", + "repo": "cachix", + "type": "github" + } + }, + "devenv": { + "inputs": { + "cachix": "cachix", + "flake-compat": "flake-compat_4", + "nix": "nix_2", + "nixpkgs": [ + "nixpkgs" + ], + "pre-commit-hooks": "pre-commit-hooks_2" + }, + "locked": { + "lastModified": 1711447263, + "narHash": "sha256-oq+sF+dqLueXkIZHS+z4y/WW/2nndC2PiOef4/948Mk=", + "owner": "cachix", + "repo": "devenv", + "rev": "8be4515cef53a5c7f315907a5aee5af1ba11f9ac", + "type": "github" + }, + "original": { + "owner": "cachix", + "repo": "devenv", + "type": "github" + } + }, + "devenv_2": { + "inputs": { + "flake-compat": [ + "devenv", + "cachix", + "flake-compat" + ], + "nix": "nix", + "nixpkgs": "nixpkgs", + "poetry2nix": "poetry2nix", + "pre-commit-hooks": [ + "devenv", + "cachix", + "pre-commit-hooks" + ] + }, + "locked": { + "lastModified": 1708704632, + "narHash": "sha256-w+dOIW60FKMaHI1q5714CSibk99JfYxm0CzTinYWr+Q=", "owner": "cachix", "repo": "devenv", - "rev": "55294461a62d90c8626feca22f52b0d3d0e18e39", + "rev": "2ee4450b0f4b95a1b90f2eb5ffea98b90e48c196", "type": "github" }, "original": { "owner": "cachix", + "ref": "python-rewrite", "repo": "devenv", "type": "github" } @@ -39,16 +95,80 @@ "type": "github" } }, + "flake-compat_2": { + "flake": false, + "locked": { + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-compat_3": { + "flake": false, + "locked": { + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-compat_4": { + "flake": false, + "locked": { + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-compat_5": { + "flake": false, + "locked": { + "lastModified": 1673956053, + "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, "flake-utils": { "inputs": { "systems": "systems" }, "locked": { - "lastModified": 1685518550, - "narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=", + "lastModified": 1689068808, + "narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=", "owner": "numtide", "repo": "flake-utils", - "rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef", + "rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4", "type": "github" }, "original": { @@ -62,11 +182,47 @@ "systems": "systems_2" }, "locked": { - "lastModified": 1694529238, - "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=", + "lastModified": 1701680307, + "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", "owner": "numtide", "repo": "flake-utils", - "rev": "ff7b65b44d01cf9ba6a71320833626af21126384", + "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "flake-utils_3": { + "inputs": { + "systems": "systems_3" + }, + "locked": { + "lastModified": 1701680307, + "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "flake-utils_4": { + "inputs": { + "systems": "systems_4" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", "type": "github" }, "original": { @@ -79,16 +235,17 @@ "inputs": { "nixpkgs": [ "devenv", + "cachix", "pre-commit-hooks", "nixpkgs" ] }, "locked": { - "lastModified": 1660459072, - "narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=", + "lastModified": 1703887061, + "narHash": "sha256-gGPa9qWNc6eCXT/+Z5/zMkyYOuRZqeFZBDbopNZQkuY=", "owner": "hercules-ci", "repo": "gitignore.nix", - "rev": "a20de23b925fd8264fd7fad6454652e142fd7f73", + "rev": "43e1aa1308018f37118e34d3a9cb4f5e75dc11d5", "type": "github" }, "original": { @@ -97,57 +254,113 @@ "type": "github" } }, - "lowdown-src": { - "flake": false, + "gitignore_2": { + "inputs": { + "nixpkgs": [ + "devenv", + "pre-commit-hooks", + "nixpkgs" + ] + }, "locked": { - "lastModified": 1633514407, - "narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=", - "owner": "kristapsdz", - "repo": "lowdown", - "rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8", + "lastModified": 1703887061, + "narHash": "sha256-gGPa9qWNc6eCXT/+Z5/zMkyYOuRZqeFZBDbopNZQkuY=", + "owner": "hercules-ci", + "repo": "gitignore.nix", + "rev": "43e1aa1308018f37118e34d3a9cb4f5e75dc11d5", "type": "github" }, "original": { - "owner": "kristapsdz", - "repo": "lowdown", + "owner": "hercules-ci", + "repo": "gitignore.nix", "type": "github" } }, "nix": { "inputs": { - "lowdown-src": "lowdown-src", + "flake-compat": "flake-compat", "nixpkgs": [ + "devenv", + "cachix", "devenv", "nixpkgs" ], "nixpkgs-regression": "nixpkgs-regression" }, "locked": { - "lastModified": 1676545802, - "narHash": "sha256-EK4rZ+Hd5hsvXnzSzk2ikhStJnD63odF7SzsQ8CuSPU=", + "lastModified": 1708577783, + "narHash": "sha256-92xq7eXlxIT5zFNccLpjiP7sdQqQI30Gyui2p/PfKZM=", + "owner": "domenkozar", + "repo": "nix", + "rev": "ecd0af0c1f56de32cbad14daa1d82a132bf298f8", + "type": "github" + }, + "original": { + "owner": "domenkozar", + "ref": "devenv-2.21", + "repo": "nix", + "type": "github" + } + }, + "nix-github-actions": { + "inputs": { + "nixpkgs": [ + "devenv", + "cachix", + "devenv", + "poetry2nix", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1688870561, + "narHash": "sha256-4UYkifnPEw1nAzqqPOTL2MvWtm3sNGw1UTYTalkTcGY=", + "owner": "nix-community", + "repo": "nix-github-actions", + "rev": "165b1650b753316aa7f1787f3005a8d2da0f5301", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "nix-github-actions", + "type": "github" + } + }, + "nix_2": { + "inputs": { + "flake-compat": "flake-compat_5", + "nixpkgs": [ + "devenv", + "nixpkgs" + ], + "nixpkgs-regression": "nixpkgs-regression_2" + }, + "locked": { + "lastModified": 1710500156, + "narHash": "sha256-zvCqeUO2GLOm7jnU23G4EzTZR7eylcJN+HJ5svjmubI=", "owner": "domenkozar", "repo": "nix", - "rev": "7c91803598ffbcfe4a55c44ac6d49b2cf07a527f", + "rev": "c5bbf14ecbd692eeabf4184cc8d50f79c2446549", "type": "github" }, "original": { "owner": "domenkozar", - "ref": "relaxed-flakes", + "ref": "devenv-2.21", "repo": "nix", "type": "github" } }, "nixpkgs": { "locked": { - "lastModified": 1697379843, - "narHash": "sha256-RcnGuJgC2K/UpTy+d32piEoBXq2M+nVFzM3ah/ZdJzg=", - "owner": "nixos", + "lastModified": 1692808169, + "narHash": "sha256-x9Opq06rIiwdwGeK2Ykj69dNc2IvUH1fY55Wm7atwrE=", + "owner": "NixOS", "repo": "nixpkgs", - "rev": "12bdeb01ff9e2d3917e6a44037ed7df6e6c3df9d", + "rev": "9201b5ff357e781bf014d0330d18555695df7ba8", "type": "github" }, "original": { - "owner": "nixos", + "owner": "NixOS", "ref": "nixpkgs-unstable", "repo": "nixpkgs", "type": "github" @@ -169,42 +382,141 @@ "type": "github" } }, + "nixpkgs-regression_2": { + "locked": { + "lastModified": 1643052045, + "narHash": "sha256-uGJ0VXIhWKGXxkeNnq4TvV3CIOkUJ3PAoLZ3HMzNVMw=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2", + "type": "github" + }, + "original": { + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2", + "type": "github" + } + }, "nixpkgs-stable": { "locked": { - "lastModified": 1685801374, - "narHash": "sha256-otaSUoFEMM+LjBI1XL/xGB5ao6IwnZOXc47qhIgJe8U=", + "lastModified": 1704874635, + "narHash": "sha256-YWuCrtsty5vVZvu+7BchAxmcYzTMfolSPP5io8+WYCg=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "3dc440faeee9e889fe2d1b4d25ad0f430d449356", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-23.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs-stable_2": { + "locked": { + "lastModified": 1704874635, + "narHash": "sha256-YWuCrtsty5vVZvu+7BchAxmcYzTMfolSPP5io8+WYCg=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "c37ca420157f4abc31e26f436c1145f8951ff373", + "rev": "3dc440faeee9e889fe2d1b4d25ad0f430d449356", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixos-23.05", + "ref": "nixos-23.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_2": { + "locked": { + "lastModified": 1711370797, + "narHash": "sha256-2xu0jVSjuKhN97dqc4bVtvEH52Rwh6+uyI1XCnzoUyI=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "c726225724e681b3626acc941c6f95d2b0602087", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixpkgs-unstable", "repo": "nixpkgs", "type": "github" } }, + "poetry2nix": { + "inputs": { + "flake-utils": "flake-utils", + "nix-github-actions": "nix-github-actions", + "nixpkgs": [ + "devenv", + "cachix", + "devenv", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1692876271, + "narHash": "sha256-IXfZEkI0Mal5y1jr6IRWMqK8GW2/f28xJenZIPQqkY0=", + "owner": "nix-community", + "repo": "poetry2nix", + "rev": "d5006be9c2c2417dafb2e2e5034d83fabd207ee3", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "poetry2nix", + "type": "github" + } + }, "pre-commit-hooks": { + "inputs": { + "flake-compat": "flake-compat_3", + "flake-utils": "flake-utils_2", + "gitignore": "gitignore", + "nixpkgs": [ + "devenv", + "cachix", + "nixpkgs" + ], + "nixpkgs-stable": "nixpkgs-stable" + }, + "locked": { + "lastModified": 1708018599, + "narHash": "sha256-M+Ng6+SePmA8g06CmUZWi1AjG2tFBX9WCXElBHEKnyM=", + "owner": "cachix", + "repo": "pre-commit-hooks.nix", + "rev": "5df5a70ad7575f6601d91f0efec95dd9bc619431", + "type": "github" + }, + "original": { + "owner": "cachix", + "repo": "pre-commit-hooks.nix", + "type": "github" + } + }, + "pre-commit-hooks_2": { "inputs": { "flake-compat": [ "devenv", "flake-compat" ], - "flake-utils": "flake-utils", - "gitignore": "gitignore", + "flake-utils": "flake-utils_3", + "gitignore": "gitignore_2", "nixpkgs": [ "devenv", "nixpkgs" ], - "nixpkgs-stable": "nixpkgs-stable" + "nixpkgs-stable": "nixpkgs-stable_2" }, "locked": { - "lastModified": 1688056373, - "narHash": "sha256-2+SDlNRTKsgo3LBRiMUcoEUb6sDViRNQhzJquZ4koOI=", + "lastModified": 1708018599, + "narHash": "sha256-M+Ng6+SePmA8g06CmUZWi1AjG2tFBX9WCXElBHEKnyM=", "owner": "cachix", "repo": "pre-commit-hooks.nix", - "rev": "5843cf069272d92b60c3ed9e55b7a8989c01d4c7", + "rev": "5df5a70ad7575f6601d91f0efec95dd9bc619431", "type": "github" }, "original": { @@ -216,8 +528,8 @@ "root": { "inputs": { "devenv": "devenv", - "flake-utils": "flake-utils_2", - "nixpkgs": "nixpkgs" + "flake-utils": "flake-utils_4", + "nixpkgs": "nixpkgs_2" } }, "systems": { @@ -249,6 +561,36 @@ "repo": "default", "type": "github" } + }, + "systems_3": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + }, + "systems_4": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } } }, "root": "root", diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index a3023fd36..3f862e483 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -11,28 +11,28 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks -import cats.effect.{Resource, Sync} +import cats.implicits._ +import cats.effect._ import org.slf4j.LoggerFactory -import java.util.Properties - -import org.apache.kafka.clients.producer._ +import fs2.kafka._ import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} /** * Kafka Sink for the Scala Stream Collector */ -class KafkaSink[F[_]: Sync]( +class KafkaSink[F[_]: Async]( val maxBytes: Int, - kafkaProducer: KafkaProducer[String, Array[Byte]], + isHealthyState: Ref[F, Boolean], + kafkaProducer: KafkaProducer[F, String, Array[Byte]], topicName: String ) extends Sink[F] { - private lazy val log = LoggerFactory.getLogger(getClass()) - @volatile private var kafkaHealthy: Boolean = false - override def isHealthy: F[Boolean] = Sync[F].pure(kafkaHealthy) + private lazy val log = LoggerFactory.getLogger(getClass()) + + override def isHealthy: F[Boolean] = isHealthyState.get /** * Store raw events to the topic @@ -40,34 +40,23 @@ class KafkaSink[F[_]: Sync]( * @param events The list of events to send * @param key The partition key to use */ - override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].delay { + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = { log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") - events.foreach { event => - kafkaProducer.send( - new ProducerRecord(topicName, key, event), - new Callback { - override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = - if (e != null) { - kafkaHealthy = false - log.error(s"Sending event failed: ${e.getMessage}") - } else { - kafkaHealthy = true - } - } - ) - } + val records = ProducerRecords(events.map(e => (ProducerRecord(topicName, key, e)))) + kafkaProducer.produce(records).onError(_ => isHealthyState.set(false)) *> isHealthyState.set(true) } } object KafkaSink { - def create[F[_]: Sync]( + def create[F[_]: Async]( sinkConfig: Config.Sink[KafkaSinkConfig], authCallbackClass: String ): Resource[F, KafkaSink[F]] = for { - kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass) - kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name) + isHealthyState <- Resource.eval(Ref.of[F, Boolean](false)) + kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass) + kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, isHealthyState, kafkaProducer, sinkConfig.name) } yield kafkaSink /** @@ -76,28 +65,24 @@ object KafkaSink { * * @return a new Kafka Producer */ - private def createProducer[F[_]: Sync]( + private def createProducer[F[_]: Async]( kafkaConfig: KafkaSinkConfig, bufferConfig: Config.Buffer, authCallbackClass: String - ): Resource[F, KafkaProducer[String, Array[Byte]]] = { - val acquire = Sync[F].delay { - val props = new Properties() - props.setProperty("bootstrap.servers", kafkaConfig.brokers) - props.setProperty("acks", "all") - props.setProperty("retries", kafkaConfig.retries.toString) - props.setProperty("buffer.memory", bufferConfig.byteLimit.toString) - props.setProperty("linger.ms", bufferConfig.timeLimit.toString) - props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") - props.setProperty("sasl.login.callback.handler.class", authCallbackClass) + ): Resource[F, KafkaProducer[F, String, Array[Byte]]] = { + val props = Map( + "acks" -> "all", + "retries" -> kafkaConfig.retries.toString, + "buffer.memory" -> bufferConfig.byteLimit.toString, + "linger.ms" -> bufferConfig.timeLimit.toString, + "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer", + "value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer", + "sasl.login.callback.handler.class" -> authCallbackClass + ) ++ kafkaConfig.producerConf.getOrElse(Map.empty) - // Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418 - kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } + val producerSettings = + ProducerSettings[F, String, Array[Byte]].withBootstrapServers(kafkaConfig.brokers).withProperties(props) - new KafkaProducer[String, Array[Byte]](props) - } - val release = (p: KafkaProducer[String, Array[Byte]]) => Sync[F].delay(p.close()) - Resource.make(acquire)(release) + KafkaProducer.resource(producerSettings) } } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index bcefd4492..d1aebcb38 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -87,7 +87,7 @@ object BuildSettings { moduleName := "snowplow-stream-collector-kafka", Docker / packageName := "scala-stream-collector-kafka", libraryDependencies ++= Seq( - Dependencies.Libraries.kafkaClients, + Dependencies.Libraries.fs2Kafka, Dependencies.Libraries.mskAuth, Dependencies.Libraries.azureIdentity, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 64e388144..41922629c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,7 +25,7 @@ object Dependencies { val fs2PubSub = "0.22.0" val http4s = "0.23.23" val jackson = "2.12.7" // force this version to mitigate security vulnerabilities - val kafka = "2.2.1" + val fs2Kafka = "2.6.1" val log4cats = "2.6.0" val log4j = "2.17.2" // CVE-2021-44228 val mskAuth = "1.1.1" @@ -66,7 +66,7 @@ object Dependencies { //sinks val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson - val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka + val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214