Skip to content

Commit

Permalink
Remove pureconfig and refined (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 authored Feb 22, 2022
1 parent da20c1d commit 8219c2f
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 40 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ThisBuild / scalacOptions ++= Seq(
"-unchecked",
"-Xcheckinit",
"-Xfatal-warnings",
"-Xsource:3",
"-Ywarn-dead-code",
"-Ywarn-extra-implicit",
"-Ywarn-numeric-widen",
Expand Down
11 changes: 1 addition & 10 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,18 @@ object Dependencies {
val all = Seq(core, kernal)
}

object Refined {
private val version = "0.9.28"
val base = "eu.timepit" %% "refined" % version
val pureconfig = "eu.timepit" %% "refined-pureconfig" % version
val all = Seq(base, pureconfig)
}

val kafkaClients = "org.apache.kafka" % "kafka-clients" % "3.1.0"
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.4"
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.10" % Runtime
val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.17.1"
val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0"

val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.1.0" % Test
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.11" % Test

val core = Akka.base ++ Cats.all ++ Refined.all ++ Seq(
val core = Akka.base ++ Cats.all ++ Seq(
kafkaClients,
scalaLogging,
logbackClassic,
pureconfig,
scalaCollectionCompat
)
val test = Akka.test ++ Seq(embeddedKafka, scalaTest)
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/com/sky/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import cats.syntax.bifunctor._
import cats.syntax.option._
import cats.syntax.show._
import cats.{Bifunctor, Show}
import com.sky.kafka.topicloader.config.{Config, TopicLoaderConfig}
import com.typesafe.scalalogging.LazyLogging
import eu.timepit.refined.pureconfig._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization._
import pureconfig.ConfigSource
import pureconfig.generic.auto._

import scala.concurrent.Future
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -78,7 +76,10 @@ trait TopicLoader extends LazyLogging {
strategy: LoadTopicStrategy,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = {
val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader
val config =
Config
.loadOrThrow(system.settings.config)
.topicLoader
load(logOffsetsForTopics(topics, strategy), config, maybeConsumerSettings)
}

Expand All @@ -89,9 +90,8 @@ trait TopicLoader extends LazyLogging {
topics: NonEmptyList[String],
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = {
val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader
val logOffsetsF = logOffsetsForTopics(topics, LoadAll)

val config = Config.loadOrThrow(system.settings.config).topicLoader
val logOffsetsF = logOffsetsForTopics(topics, LoadAll)
val postLoadingSource = Source.futureSource(logOffsetsF.map { logOffsets =>
val highestOffsets = logOffsets.map { case (p, o) => p -> o.highest }
kafkaSource[K, V](highestOffsets, config, maybeConsumerSettings)
Expand Down Expand Up @@ -284,7 +284,7 @@ trait DeprecatedMethods { self: TopicLoader =>
onRecord: ConsumerRecord[String, T] => Future[_],
valueDeserializer: Deserializer[T]
)(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = {
val config = ConfigSource.fromConfig(system.settings.config).loadOrThrow[Config].topicLoader
val config = Config.loadOrThrow(system.settings.config).topicLoader

import system.dispatcher

Expand Down
21 changes: 0 additions & 21 deletions src/main/scala/com/sky/kafka/topicloader/config.scala

This file was deleted.

22 changes: 22 additions & 0 deletions src/main/scala/com/sky/kafka/topicloader/config/config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sky.kafka.topicloader

import cats.data.ValidatedNec
import cats.implicits._
import com.typesafe.config.ConfigException

import scala.util.Try

package object config {
type ValidationResult[A] = ValidatedNec[ConfigException, A]

implicit class TryOps[A](t: Try[A]) {
def validate(path: String): ValidationResult[A] = t.toEither.validate(path)
}

implicit class EitherOps[A](e: Either[Throwable, A]) {
def validate(path: String): ValidationResult[A] = e.leftMap {
case ce: ConfigException => ce
case e: Throwable => new ConfigException.BadValue(path, e.getMessage)
}.toValidatedNec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.sky.kafka.topicloader.config

import java.util.concurrent.TimeUnit

import cats.data.{Validated, ValidatedNec}
import cats.implicits._
import com.typesafe.config.{Config => TypesafeConfig, ConfigException}

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

final case class PosInt private (_value: Int) {
val value: Int = _value
}

object PosInt {
def apply(value: Int): Either[IllegalArgumentException, PosInt] =
if (value > 0) new PosInt(value).asRight
else new IllegalArgumentException(s"$value is not a positive Int").asLeft

val One = new PosInt(1)
}

final case class Config(topicLoader: TopicLoaderConfig)

/** @param parallelism
* Determines how many Kafka records are processed in parallel by [[com.sky.kafka.topicloader.TopicLoader]]. We
* recommend using a parallelism > 1 if you are processing the records by sending them to an akka.actor.Actor. This
* is so that messages are buffered in the akka.actor.Actor's mailbox, improving performance versus using a
* parallelism of 1.
*/
final case class TopicLoaderConfig(
idleTimeout: FiniteDuration,
bufferSize: PosInt,
// @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0")
parallelism: PosInt = PosInt.One
)

object Config {
private val basePath = "topic-loader"

def load(config: TypesafeConfig): ValidatedNec[ConfigException, Config] = {
val idleTimeout = Try(
FiniteDuration(config.getDuration(s"$basePath.idle-timeout").toNanos, TimeUnit.NANOSECONDS)
).validate(s"$basePath.idle-timeout")

val bufferSize = PosInt(config.getInt(s"$basePath.buffer-size"))
.validate(s"$basePath.buffer-size")

val parallelism = PosInt(config.getInt(s"$basePath.parallelism"))
.validate(s"$basePath.parallelism")

(idleTimeout, bufferSize, parallelism).mapN(TopicLoaderConfig.apply).map(Config.apply)
}

def loadOrThrow(config: TypesafeConfig): Config = load(config) match {
case Validated.Valid(config) => config
case Validated.Invalid(e) =>
throw new ConfigException.Generic(
s"Error loading config:\n\t${e.toNonEmptyList.toList.mkString("\t\n")}\n"
)
}
}
48 changes: 47 additions & 1 deletion src/test/scala/integration/TopicLoaderIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import cats.data.NonEmptyList
import cats.syntax.option._
import com.sky.kafka.topicloader.TopicLoader.consumerSettings
import com.sky.kafka.topicloader._
import com.typesafe.config.ConfigFactory
import com.sky.kafka.topicloader.config.Config
import com.typesafe.config.{ConfigException, ConfigFactory}
import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer}
import org.apache.kafka.common.errors.{TimeoutException => KafkaTimeoutException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.prop.Tables.Table

import scala.concurrent.Future
import scala.concurrent.duration._

class TopicLoaderIntSpec extends IntegrationSpecBase {

Expand Down Expand Up @@ -268,4 +270,48 @@ class TopicLoaderIntSpec extends IntegrationSpecBase {
}
}

"config" should {

"load a valid config correctly" in new TestContext {

override implicit lazy val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 1 second
| buffer-size = 10
|}
""".stripMargin
)
)

val config = Config.loadOrThrow(system.settings.config)
config.topicLoader.idleTimeout shouldBe 1.second
config.topicLoader.bufferSize.value shouldBe 10
}

"fail to load a valid config" in new TestContext {
override implicit lazy val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 9999999999999999999999 seconds
| buffer-size = -1
|}
""".stripMargin
)
)

val exception: ConfigException = intercept[ConfigException](Config.loadOrThrow(system.settings.config))

exception.getMessage should (
include(
"Invalid value at 'topic-loader.idle-timeout': Could not parse duration number '9999999999999999999999'"
) and include("Invalid value at 'topic-loader.buffer-size': -1 is not a positive Int")
)
}
}

}

0 comments on commit 8219c2f

Please sign in to comment.