diff --git a/build.sbt b/build.sbt index df216e1f..d987cb6e 100644 --- a/build.sbt +++ b/build.sbt @@ -37,6 +37,7 @@ ThisBuild / scalacOptions ++= Seq( "-unchecked", "-Xcheckinit", "-Xfatal-warnings", + "-Xsource:3", "-Ywarn-dead-code", "-Ywarn-extra-implicit", "-Ywarn-numeric-widen", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d2aba6fb..447d4cef 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,27 +19,18 @@ object Dependencies { val all = Seq(core, kernal) } - object Refined { - private val version = "0.9.28" - val base = "eu.timepit" %% "refined" % version - val pureconfig = "eu.timepit" %% "refined-pureconfig" % version - val all = Seq(base, pureconfig) - } - val kafkaClients = "org.apache.kafka" % "kafka-clients" % "3.1.0" val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.4" val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.10" % Runtime - val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.17.1" val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0" val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.1.0" % Test val scalaTest = "org.scalatest" %% "scalatest" % "3.2.11" % Test - val core = Akka.base ++ Cats.all ++ Refined.all ++ Seq( + val core = Akka.base ++ Cats.all ++ Seq( kafkaClients, scalaLogging, logbackClassic, - pureconfig, scalaCollectionCompat ) val test = Akka.test ++ Seq(embeddedKafka, scalaTest) diff --git a/src/main/scala/com/sky/kafka/topicloader/TopicLoader.scala b/src/main/scala/com/sky/kafka/topicloader/TopicLoader.scala index 832975a9..fa8f4e5a 100644 --- a/src/main/scala/com/sky/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/com/sky/kafka/topicloader/TopicLoader.scala @@ -14,13 +14,11 @@ import cats.syntax.bifunctor._ import cats.syntax.option._ import cats.syntax.show._ import cats.{Bifunctor, Show} +import com.sky.kafka.topicloader.config.{Config, TopicLoaderConfig} import com.typesafe.scalalogging.LazyLogging -import eu.timepit.refined.pureconfig._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization._ -import pureconfig.ConfigSource -import pureconfig.generic.auto._ import scala.concurrent.Future import scala.jdk.CollectionConverters._ @@ -78,7 +76,10 @@ trait TopicLoader extends LazyLogging { strategy: LoadTopicStrategy, maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = { - val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader + val config = + Config + .loadOrThrow(system.settings.config) + .topicLoader load(logOffsetsForTopics(topics, strategy), config, maybeConsumerSettings) } @@ -89,9 +90,8 @@ trait TopicLoader extends LazyLogging { topics: NonEmptyList[String], maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = { - val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader - val logOffsetsF = logOffsetsForTopics(topics, LoadAll) - + val config = Config.loadOrThrow(system.settings.config).topicLoader + val logOffsetsF = logOffsetsForTopics(topics, LoadAll) val postLoadingSource = Source.futureSource(logOffsetsF.map { logOffsets => val highestOffsets = logOffsets.map { case (p, o) => p -> o.highest } kafkaSource[K, V](highestOffsets, config, maybeConsumerSettings) @@ -284,7 +284,7 @@ trait DeprecatedMethods { self: TopicLoader => onRecord: ConsumerRecord[String, T] => Future[_], valueDeserializer: Deserializer[T] )(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = { - val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader + val config = Config.loadOrThrow(system.settings.config).topicLoader import system.dispatcher diff --git a/src/main/scala/com/sky/kafka/topicloader/config.scala b/src/main/scala/com/sky/kafka/topicloader/config.scala deleted file mode 100644 index 33cb3a74..00000000 --- a/src/main/scala/com/sky/kafka/topicloader/config.scala +++ /dev/null @@ -1,21 +0,0 @@ -package com.sky.kafka.topicloader - -import eu.timepit.refined.api.Refined -import eu.timepit.refined.auto._ -import eu.timepit.refined.numeric.Positive - -import scala.concurrent.duration.FiniteDuration - -final case class Config(topicLoader: TopicLoaderConfig) - -/** @param parallelism - * Determines how many Kafka records are processed in parallel by [[TopicLoader]]. We recommend using a parallelism > - * 1 if you are processing the records by sending them to an akka.actor.Actor. This is so that messages are buffered - * in the akka.actor.Actor's mailbox, improving performance versus using a parallelism of 1. - */ -final case class TopicLoaderConfig( - idleTimeout: FiniteDuration, - bufferSize: Int Refined Positive, -// @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0") - parallelism: Int Refined Positive = 1 -) diff --git a/src/main/scala/com/sky/kafka/topicloader/config/config.scala b/src/main/scala/com/sky/kafka/topicloader/config/config.scala new file mode 100644 index 00000000..dd8b2b98 --- /dev/null +++ b/src/main/scala/com/sky/kafka/topicloader/config/config.scala @@ -0,0 +1,22 @@ +package com.sky.kafka.topicloader + +import cats.data.ValidatedNec +import cats.implicits._ +import com.typesafe.config.ConfigException + +import scala.util.Try + +package object config { + type ValidationResult[A] = ValidatedNec[ConfigException, A] + + implicit class TryOps[A](t: Try[A]) { + def validate(path: String): ValidationResult[A] = t.toEither.validate(path) + } + + implicit class EitherOps[A](e: Either[Throwable, A]) { + def validate(path: String): ValidationResult[A] = e.leftMap { + case ce: ConfigException => ce + case e: Throwable => new ConfigException.BadValue(path, e.getMessage) + }.toValidatedNec + } +} diff --git a/src/main/scala/com/sky/kafka/topicloader/config/topicLoaderConfig.scala b/src/main/scala/com/sky/kafka/topicloader/config/topicLoaderConfig.scala new file mode 100644 index 00000000..86d31b49 --- /dev/null +++ b/src/main/scala/com/sky/kafka/topicloader/config/topicLoaderConfig.scala @@ -0,0 +1,63 @@ +package com.sky.kafka.topicloader.config + +import java.util.concurrent.TimeUnit + +import cats.data.{Validated, ValidatedNec} +import cats.implicits._ +import com.typesafe.config.{Config => TypesafeConfig, ConfigException} + +import scala.concurrent.duration.FiniteDuration +import scala.util.Try + +final case class PosInt private (_value: Int) { + val value: Int = _value +} + +object PosInt { + def apply(value: Int): Either[IllegalArgumentException, PosInt] = + if (value > 0) new PosInt(value).asRight + else new IllegalArgumentException(s"$value is not a positive Int").asLeft + + val One = new PosInt(1) +} + +final case class Config(topicLoader: TopicLoaderConfig) + +/** @param parallelism + * Determines how many Kafka records are processed in parallel by [[com.sky.kafka.topicloader.TopicLoader]]. We + * recommend using a parallelism > 1 if you are processing the records by sending them to an akka.actor.Actor. This + * is so that messages are buffered in the akka.actor.Actor's mailbox, improving performance versus using a + * parallelism of 1. + */ +final case class TopicLoaderConfig( + idleTimeout: FiniteDuration, + bufferSize: PosInt, +// @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0") + parallelism: PosInt = PosInt.One +) + +object Config { + private val basePath = "topic-loader" + + def load(config: TypesafeConfig): ValidatedNec[ConfigException, Config] = { + val idleTimeout = Try( + FiniteDuration(config.getDuration(s"$basePath.idle-timeout").toNanos, TimeUnit.NANOSECONDS) + ).validate(s"$basePath.idle-timeout") + + val bufferSize = PosInt(config.getInt(s"$basePath.buffer-size")) + .validate(s"$basePath.buffer-size") + + val parallelism = PosInt(config.getInt(s"$basePath.parallelism")) + .validate(s"$basePath.parallelism") + + (idleTimeout, bufferSize, parallelism).mapN(TopicLoaderConfig.apply).map(Config.apply) + } + + def loadOrThrow(config: TypesafeConfig): Config = load(config) match { + case Validated.Valid(config) => config + case Validated.Invalid(e) => + throw new ConfigException.Generic( + s"Error loading config:\n\t${e.toNonEmptyList.toList.mkString("\t\n")}\n" + ) + } +} diff --git a/src/test/scala/integration/TopicLoaderIntSpec.scala b/src/test/scala/integration/TopicLoaderIntSpec.scala index 07c5e40e..4f333708 100644 --- a/src/test/scala/integration/TopicLoaderIntSpec.scala +++ b/src/test/scala/integration/TopicLoaderIntSpec.scala @@ -11,7 +11,8 @@ import cats.data.NonEmptyList import cats.syntax.option._ import com.sky.kafka.topicloader.TopicLoader.consumerSettings import com.sky.kafka.topicloader._ -import com.typesafe.config.ConfigFactory +import com.sky.kafka.topicloader.config.Config +import com.typesafe.config.{ConfigException, ConfigFactory} import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer} import org.apache.kafka.common.errors.{TimeoutException => KafkaTimeoutException} import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -19,6 +20,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.prop.Tables.Table import scala.concurrent.Future +import scala.concurrent.duration._ class TopicLoaderIntSpec extends IntegrationSpecBase { @@ -268,4 +270,48 @@ class TopicLoaderIntSpec extends IntegrationSpecBase { } } + "config" should { + + "load a valid config correctly" in new TestContext { + + override implicit lazy val system: ActorSystem = ActorSystem( + "test-actor-system", + ConfigFactory.parseString( + s""" + |topic-loader { + | idle-timeout = 1 second + | buffer-size = 10 + |} + """.stripMargin + ) + ) + + val config = Config.loadOrThrow(system.settings.config) + config.topicLoader.idleTimeout shouldBe 1.second + config.topicLoader.bufferSize.value shouldBe 10 + } + + "fail to load a valid config" in new TestContext { + override implicit lazy val system: ActorSystem = ActorSystem( + "test-actor-system", + ConfigFactory.parseString( + s""" + |topic-loader { + | idle-timeout = 9999999999999999999999 seconds + | buffer-size = -1 + |} + """.stripMargin + ) + ) + + val exception: ConfigException = intercept[ConfigException](Config.loadOrThrow(system.settings.config)) + + exception.getMessage should ( + include( + "Invalid value at 'topic-loader.idle-timeout': Could not parse duration number '9999999999999999999999'" + ) and include("Invalid value at 'topic-loader.buffer-size': -1 is not a positive Int") + ) + } + } + }