Skip to content

Commit

Permalink
Akka migration: config (#366)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Daniel Smedley <[email protected]>
  • Loading branch information
btanyab and DSmedleySky authored Feb 6, 2025
1 parent c1c8242 commit a082ec5
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 19 deletions.
15 changes: 8 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
Global / onChangedBuildSource := ReloadOnSourceChanges

val scala2Settings = Seq(
scalaVersion := "2.13.10",
scalaVersion := "2.13.15",
tpolecatScalacOptions ++= Set(
ScalacOptions.other("-Ymacro-annotations"),
ScalacOptions.source3
Expand All @@ -37,10 +37,11 @@ val scala3Settings = Seq(
Test / parallelExecution := false
)

val buildInfoSettings = Seq(
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion),
buildInfoPackage := "com.sky"
)
val buildInfoSettings = (pkg: String) =>
Seq(
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion),
buildInfoPackage := pkg
)

lazy val scheduler = (project in file("scheduler"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin)
Expand All @@ -49,7 +50,7 @@ lazy val scheduler = (project in file("scheduler"))
libraryDependencies ++= Dependencies.scheduler,
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.3" cross CrossVersion.full),
javaAgents += "io.kamon" % "kanela-agent" % "1.0.18",
buildInfoSettings,
buildInfoSettings("com.sky"),
dockerSettings,
releaseSettings
)
Expand All @@ -59,7 +60,7 @@ lazy val scheduler3 = (project in file("scheduler-3"))
.settings(scala3Settings)
.settings(
libraryDependencies ++= Dependencies.scheduler3,
buildInfoSettings,
buildInfoSettings("uk.sky"),
scalafixConfig := Some((ThisBuild / baseDirectory).value / ".scalafix3.conf"),
scalafmtConfig := (ThisBuild / baseDirectory).value / ".scalafmt3.conf"
)
Expand Down
23 changes: 17 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object Dependencies {
object Cats {
private val version = "2.7.0"
private val catsEffectVersion = "3.5.7"
private val log4catsVersion = "2.7.0"

lazy val effectTestKit = "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test
lazy val effectTesting = "org.typelevel" %% "cats-effect-testing-scalatest" % "1.6.0" % Test
Expand All @@ -25,6 +26,8 @@ object Dependencies {
lazy val caseInsensitiveTesting = "org.typelevel" %% "case-insensitive-testing" % "1.4.2"
lazy val core = "org.typelevel" %% "cats-core" % version
lazy val effect = "org.typelevel" %% "cats-effect" % catsEffectVersion
lazy val log4cats = "org.typelevel" %% "log4cats-core" % log4catsVersion
lazy val log4catsSlf4j = "org.typelevel" %% "log4cats-slf4j" % log4catsVersion
lazy val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test
lazy val testKit = "org.typelevel" %% "cats-testkit" % version % Test
lazy val base = Seq(core)
Expand Down Expand Up @@ -60,10 +63,14 @@ object Dependencies {
}

object PureConfig {
private val version = "0.17.1"
val pureconfig = "com.github.pureconfig" %% "pureconfig" % version
val cats = "com.github.pureconfig" %% "pureconfig-cats" % version
val all = Seq(pureconfig, cats)
private val version = "0.17.8"
// TODO: Remove if not needed in Scala3
val pureconfig = "com.github.pureconfig" %% "pureconfig" % version
val core = "com.github.pureconfig" %% "pureconfig-core" % version
// TODO: Remove if not needed in Scala3
val cats = "com.github.pureconfig" %% "pureconfig-cats" % version
val catsEffect = "com.github.pureconfig" %% "pureconfig-cats-effect" % version
val allScala2 = Seq(pureconfig, cats)
}

object Refined {
Expand Down Expand Up @@ -99,7 +106,7 @@ object Dependencies {
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.18" % Test
val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % Test

val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq(
val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.allScala2 ++ Refined.base ++ Seq(
avro4s,
kafkaTopicLoader,
monix,
Expand Down Expand Up @@ -130,10 +137,14 @@ object Dependencies {
Cats.effectTestKit,
Cats.effectTesting,
Cats.effectTestkitScalatest,
Cats.log4cats,
Cats.log4catsSlf4j,
Fs2.core,
Fs2.kafka,
Monocle.core,
Vulcan.core,
Vulcan.generic
Vulcan.generic,
PureConfig.core,
PureConfig.catsEffect
)
}
44 changes: 44 additions & 0 deletions scheduler-3/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
scheduler.reader {
schedule-topics = [${?SCHEDULE_TOPICS}]
kafka-brokers = "localhost:9092"
kafka-brokers = ${?KAFKA_BROKERS}
}

kafka {
consumer {
bootstrap-servers = ${scheduler.reader.kafka-brokers}
properties = {
"group.id": "com.sky.kafka.scheduler"
"group.id": ${?CONSUMER_GROUP_ID}
"auto.offset.reset": "earliest"
"security.protocol": PLAINTEXT
"security.protocol": ${?SECURITY_PROTOCOL}
"ssl.keystore.location": ${?KEYSTORE_LOCATION}
"ssl.keystore.password": ${?KEYSTORE_PASSWORD}
"ssl.truststore.location": ${?TRUSTSTORE_LOCATION}
"ssl.truststore.password": ${?TRUSTSTORE_PASSWORD}
"ssl.endpoint.identification.algorithm": ""
}
}

producer {
bootstrap-servers = ${scheduler.reader.kafka-brokers}
properties = {
"buffer.memory": 80000000
"batch.size": 500000
"linger.ms": 100
"security.protocol": PLAINTEXT
"security.protocol": ${?SECURITY_PROTOCOL}
"ssl.keystore.location": ${?KEYSTORE_LOCATION}
"ssl.keystore.password": ${?KEYSTORE_PASSWORD}
"ssl.truststore.location": ${?TRUSTSTORE_LOCATION}
"ssl.truststore.password": ${?TRUSTSTORE_PASSWORD}
"ssl.endpoint.identification.algorithm": ""
}
}

commit {
max-batch = 250
max-interval = 10 seconds
}
}
24 changes: 19 additions & 5 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
package uk.sky.scheduler

import cats.data.Reader
import cats.effect.*
import fs2.Stream
import org.typelevel
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory
import pureconfig.ConfigSource
import pureconfig.module.catseffect.syntax.*
import uk.sky.scheduler.config.Config

object Main extends IOApp.Simple {

def stream[IO[_] : Concurrent]: Stream[IO, Unit] =
def stream: Reader[Config, Stream[IO, Unit]] =
for {
scheduler <- Stream.resource(Scheduler.live[IO])
message <- scheduler.stream
} yield message
scheduler <- Scheduler.live[IO]
stream <- Reader[Config, Stream[IO, Scheduler[IO, Unit]]](_ => Stream.resource(scheduler))
} yield stream.flatMap(_.stream)

override def run: IO[Unit] = stream[IO].compile.drain
override def run: IO[Unit] = for {
given LoggerFactory[IO] <- IO(Slf4jFactory.create[IO])
logger <- LoggerFactory[IO].create
config <- ConfigSource.default.loadF[IO, Config]()
_ <- logger.info(s"Running ${Config.metadata.appName} with version ${Config.metadata.version}")
_ <- logger.info(s"Loaded config: $config")
_ <- stream(config).compile.drain
} yield ()

}
5 changes: 4 additions & 1 deletion scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package uk.sky.scheduler

import cats.data.Reader
import cats.effect.*
import fs2.Stream
import uk.sky.scheduler.config.Config
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.message.Message
import uk.sky.scheduler.message.Metadata.*
Expand All @@ -25,11 +27,12 @@ class Scheduler[F[_] : Concurrent, O](
}

object Scheduler {
def live[F[_] : Concurrent]: Resource[F, Scheduler[F, Unit]] =
def live[F[_] : Concurrent]: Reader[Config, Resource[F, Scheduler[F, Unit]]] = Reader { config =>
for {
eventSubscriber <- Resource.pure(??? : EventSubscriber[F])
scheduleQueue <- Resource.pure(??? : ScheduleQueue[F])
schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit])
} yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher)
}

}
64 changes: 64 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package uk.sky.scheduler.config

import cats.effect.{Resource, Sync}
import fs2.kafka.*
import pureconfig.ConfigReader
import uk.sky.BuildInfo

import scala.concurrent.duration.FiniteDuration

final case class Config(
kafka: KafkaConfig,
scheduler: SchedulerConfig
) derives ConfigReader

object Config {
private[config] final case class Metadata(appName: String, version: String)
val metadata: Metadata = Metadata(appName = BuildInfo.name, version = BuildInfo.version)
}

final case class KafkaConfig(
consumer: ConsumerProducerConfig,
producer: ConsumerProducerConfig,
commit: CommitConfig
) derives ConfigReader

object KafkaConfig {
extension (config: KafkaConfig) {
def consumerSettings[F[_] : Sync, K, V](using
Resource[F, KeyDeserializer[F, K]],
Resource[F, ValueDeserializer[F, V]]
): ConsumerSettings[F, K, V] =
ConsumerSettings[F, K, V]
.withBootstrapServers(config.consumer.bootstrapServers)
.withProperties(config.consumer.properties)
.withAutoOffsetReset(AutoOffsetReset.Earliest)

def producerSettings[F[_] : Sync, K, V](using
Resource[F, KeySerializer[F, K]],
Resource[F, ValueSerializer[F, V]]
): ProducerSettings[F, K, V] =
ProducerSettings[F, K, V]
.withBootstrapServers(config.producer.bootstrapServers)
.withProperties(config.producer.properties)
}
}

final case class ConsumerProducerConfig(
bootstrapServers: String,
properties: Map[String, String]
) derives ConfigReader

final case class CommitConfig(
maxBatch: Int,
maxInterval: FiniteDuration
) derives ConfigReader

final case class SchedulerConfig(
reader: ReaderConfig
) derives ConfigReader

final case class ReaderConfig(
scheduleTopics: List[String],
kafkaBrokers: String
) derives ConfigReader

0 comments on commit a082ec5

Please sign in to comment.