Skip to content

Commit

Permalink
Fs2 kms EventSubscriber (#375)
Browse files Browse the repository at this point in the history
* created SchedulePublisher class and added in relevant dependencies

* added relevant dependency files and their corresponding unit tests - WIP

* fixed compilation error

* removed json related

* removed SchedulePublisher changes

* reintroduced json files

* added EventSusbscriber

* updated application.conf with avro and json topics

* added eventSubscriber to Scheduler

* update from pr comments

* fmt

---------

Co-authored-by: susanabrahamtharakan <[email protected]>
Co-authored-by: tba32 <[email protected]>
  • Loading branch information
3 people authored Feb 21, 2025
1 parent 1ecf4d1 commit 162b9af
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 7 deletions.
6 changes: 5 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object Dependencies {
private lazy val kafkaVersion = "3.6.0"

lazy val core = "co.fs2" %% "fs2-core" % version
lazy val io = "co.fs2" %% "fs2-io" % version
lazy val kafka = "com.github.fd4s" %% "fs2-kafka" % kafkaVersion
}

Expand Down Expand Up @@ -117,6 +118,7 @@ object Dependencies {
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2"
val chimney = "io.scalaland" %% "chimney" % "1.5.0"
val kafkaTopicLoader = "uk.sky" %% "kafka-topic-loader" % "1.5.6"
val fs2TopicLoader = "uk.sky" %% "fs2-kafka-topic-loader" % "0.1.0"
val monix = "io.monix" %% "monix-execution" % "3.4.1"
val mouse = "org.typelevel" %% "mouse" % "1.3.2"
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"
Expand Down Expand Up @@ -169,6 +171,7 @@ object Dependencies {
Cats.log4catsSlf4j,
Fs2.core,
Fs2.kafka,
Fs2.io,
Monocle.core,
Vulcan.core,
Vulcan.generic,
Expand All @@ -183,6 +186,7 @@ object Dependencies {
mouse,
chimney,
Circe.generic,
Circe.parser
Circe.parser,
fs2TopicLoader
)
}
4 changes: 4 additions & 0 deletions scheduler-3/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
scheduler.reader {
schedule-topics = [${?SCHEDULE_TOPICS}]
topics {
avro = [${?AVRO_SCHEDULE_TOPICS}]
json = [${?JSON_SCHEDULE_TOPICS}]
}
kafka-brokers = "localhost:9092"
kafka-brokers = ${?KAFKA_BROKERS}
}
Expand Down
137 changes: 136 additions & 1 deletion scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,145 @@
package uk.sky.scheduler

import fs2.Stream
import cats.effect.Resource.ExitCase
import cats.effect.{Async, Deferred, Ref, Resource}
import cats.syntax.all.*
import cats.{Monad, Parallel, Show}
import fs2.*
import fs2.kafka.*
import mouse.all.*
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.otel4s.metrics.Meter
import org.typelevel.otel4s.{Attribute, Attributes}
import uk.sky.fs2.kafka.topicloader.TopicLoader
import uk.sky.scheduler.circe.jsonScheduleDecoder
import uk.sky.scheduler.config.KafkaConfig
import uk.sky.scheduler.converters.all.*
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.error.ScheduleError
import uk.sky.scheduler.kafka.avro.{avroBinaryDeserializer, avroScheduleCodec, AvroSchedule}
import uk.sky.scheduler.kafka.json.{jsonDeserializer, JsonSchedule}
import uk.sky.scheduler.message.Message

trait EventSubscriber[F[_]] {
def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]]
}

object EventSubscriber {
private type Output = Either[ScheduleError, Option[ScheduleEvent]]

def kafka[F[_] : Async : Parallel : LoggerFactory](
config: KafkaConfig,
loaded: Deferred[F, Unit]
): F[EventSubscriber[F]] = {

val avroConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] = {
given Resource[F, Deserializer[F, Either[ScheduleError, Option[AvroSchedule]]]] =
avroBinaryDeserializer[F, AvroSchedule].map(_.option.map(_.sequence))

config.consumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]]
}

val jsonConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] = {
given Deserializer[F, Either[ScheduleError, Option[JsonSchedule]]] =
jsonDeserializer[F, JsonSchedule].option.map(_.sequence)

config.consumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]]
}

for {
avroLoadedRef <- Ref.of[F, Boolean](false)
jsonLoadedRef <- Ref.of[F, Boolean](false)
} yield new EventSubscriber[F] {

/** If both topics have finished loading, complete the Deferred to allow Queueing schedules.
*/
private def onLoadCompare(exitCase: ExitCase): F[Unit] =
exitCase match {
case ExitCase.Succeeded =>
for {
avroLoaded <- avroLoadedRef.get
jsonLoaded <- jsonLoadedRef.get
_ <- Async[F].whenA(avroLoaded && jsonLoaded)(loaded.complete(()))
} yield ()
case ExitCase.Errored(_) | ExitCase.Canceled => Async[F].unit
}

private val avroStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[AvroSchedule]]]] =
config.topics.avro.toNel
.fold(Stream.exec(avroLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))(
TopicLoader.loadAndRun(_, avroConsumerSettings) { exitCase =>
avroLoadedRef.set(true) *> onLoadCompare(exitCase)
}
)

private val jsonStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule]]]] =
config.topics.json.toNel
.fold(Stream.exec(jsonLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))(
TopicLoader.loadAndRun(_, jsonConsumerSettings) { exitCase =>
jsonLoadedRef.set(true) *> onLoadCompare(exitCase)
}
)

override def messages: Stream[F, Message[Output]] =
avroStream.merge(jsonStream).map(_.toMessage)
}
}

def observed[F[_] : Monad : Parallel : LoggerFactory : Meter](delegate: EventSubscriber[F]): F[EventSubscriber[F]] = {
given Show[ScheduleError] = {
case _: ScheduleError.InvalidAvroError => "invalid-avro"
case _: ScheduleError.NotJsonError => "not-json"
case _: ScheduleError.InvalidJsonError => "invalid-json"
case _: ScheduleError.DecodeError => "decode"
case _: ScheduleError.TransformationError => "transformation"
}

def updateAttributes(source: String) = Attributes(
Attribute("message.type", "update"),
Attribute("message.source", source)
)

def deleteAttributes(source: String, deleteType: String) = Attributes(
Attribute("message.type", "delete"),
Attribute("message.source", source),
Attribute("message.delete.type", deleteType)
)

def errorAttributes(source: String, error: ScheduleError) = Attributes(
Attribute("message.type", "error"),
Attribute("message.source", source),
Attribute("message.error.type", error.show)
)

for {
counter <- Meter[F].counter[Long]("event-subscriber").create
logger <- LoggerFactory[F].create
} yield new EventSubscriber[F] {
override def messages: Stream[F, Message[Output]] =
delegate.messages.evalTapChunk { case Message(key, source, value, metadata) =>
val logCtx = Map("key" -> key, "source" -> source)

value match {
case Right(Some(_)) =>
logger.info(logCtx)(show"Decoded UPDATE for [$key] from $source") &>
counter.inc(updateAttributes(source))

case Right(None) =>
lazy val deleteType = metadata.isExpired.fold("expired", "canceled")
logger.info(logCtx)(show"Decoded DELETE type=[$deleteType] for [$key] from $source") &>
counter.inc(deleteAttributes(source, deleteType))

case Left(error) =>
logger.error(logCtx, error)(show"Error decoding [$key] from $source") &>
counter.inc(errorAttributes(source, error))
}
}
}
}

def live[F[_] : Async : Parallel : LoggerFactory : Meter](
config: KafkaConfig,
loaded: Deferred[F, Unit]
): F[EventSubscriber[F]] =
kafka[F](config, loaded).flatMap(observed)
}
11 changes: 7 additions & 4 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package uk.sky.scheduler
import cats.Parallel
import cats.data.Reader
import cats.effect.*
import cats.effect.syntax.all.*
import cats.effect.{Async, Concurrent, Deferred, Resource}
import fs2.Stream
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.otel4s.metrics.Meter
Expand Down Expand Up @@ -33,9 +35,10 @@ object Scheduler {
def live[F[_] : Async : Parallel : LoggerFactory : Meter]: Reader[Config, Resource[F, Scheduler[F, Unit]]] = Reader {
config =>
for {
eventSubscriber <- Resource.pure(??? : EventSubscriber[F])
scheduleQueue <- Resource.pure(??? : ScheduleQueue[F])
schedulePublisher = SchedulePublisher.live[F](config.kafka)
} yield Scheduler[F, Unit](eventSubscriber, scheduleQueue, schedulePublisher)
allowEnqueue <- Deferred[F, Unit].toResource
eventSubscriber <- EventSubscriber.live[F](config.kafka, allowEnqueue).toResource
scheduleQueue <- Resource.pure(??? : ScheduleQueue[F])
schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit])
} yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher)
}
}
13 changes: 13 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object Config {
}

final case class KafkaConfig(
topics: TopicConfig,
consumer: ConsumerProducerConfig,
producer: ConsumerProducerConfig,
commit: CommitConfig
Expand Down Expand Up @@ -62,3 +63,15 @@ final case class ReaderConfig(
scheduleTopics: List[String],
kafkaBrokers: String
) derives ConfigReader

final case class TopicConfig(avro: List[String], json: List[String])

object TopicConfig {
given topicConfigReader: ConfigReader[TopicConfig] =
ConfigReader
.forProduct2[TopicConfig, List[String], List[String]]("avro", "json")(TopicConfig.apply)
.ensure(
config => config.avro.nonEmpty || config.json.nonEmpty,
message = _ => "both Avro and JSON topics were empty"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,4 @@ class ScheduleEventConverterSpec
scheduleEvent.toTombstone should equalProducerRecord(tombstone)
}
}

}

0 comments on commit 162b9af

Please sign in to comment.