diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b9d53d2e..e3aa9a14 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -39,6 +39,7 @@ object Dependencies { private lazy val kafkaVersion = "3.6.0" lazy val core = "co.fs2" %% "fs2-core" % version + lazy val io = "co.fs2" %% "fs2-io" % version lazy val kafka = "com.github.fd4s" %% "fs2-kafka" % kafkaVersion } @@ -117,6 +118,7 @@ object Dependencies { val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2" val chimney = "io.scalaland" %% "chimney" % "1.5.0" val kafkaTopicLoader = "uk.sky" %% "kafka-topic-loader" % "1.5.6" + val fs2TopicLoader = "uk.sky" %% "fs2-kafka-topic-loader" % "0.1.0" val monix = "io.monix" %% "monix-execution" % "3.4.1" val mouse = "org.typelevel" %% "mouse" % "1.3.2" val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" @@ -169,6 +171,7 @@ object Dependencies { Cats.log4catsSlf4j, Fs2.core, Fs2.kafka, + Fs2.io, Monocle.core, Vulcan.core, Vulcan.generic, @@ -183,6 +186,7 @@ object Dependencies { mouse, chimney, Circe.generic, - Circe.parser + Circe.parser, + fs2TopicLoader ) } diff --git a/scheduler-3/src/main/resources/application.conf b/scheduler-3/src/main/resources/application.conf index a2eea4e6..6d87d592 100644 --- a/scheduler-3/src/main/resources/application.conf +++ b/scheduler-3/src/main/resources/application.conf @@ -1,5 +1,9 @@ scheduler.reader { schedule-topics = [${?SCHEDULE_TOPICS}] + topics { + avro = [${?AVRO_SCHEDULE_TOPICS}] + json = [${?JSON_SCHEDULE_TOPICS}] + } kafka-brokers = "localhost:9092" kafka-brokers = ${?KAFKA_BROKERS} } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala index de1a9f7f..afa5e74e 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala @@ -1,10 +1,145 @@ package uk.sky.scheduler -import fs2.Stream +import cats.effect.Resource.ExitCase +import cats.effect.{Async, Deferred, Ref, Resource} +import cats.syntax.all.* +import cats.{Monad, Parallel, Show} +import fs2.* +import fs2.kafka.* +import mouse.all.* +import org.typelevel.log4cats.LoggerFactory +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.{Attribute, Attributes} +import uk.sky.fs2.kafka.topicloader.TopicLoader +import uk.sky.scheduler.circe.jsonScheduleDecoder +import uk.sky.scheduler.config.KafkaConfig +import uk.sky.scheduler.converters.all.* import uk.sky.scheduler.domain.ScheduleEvent import uk.sky.scheduler.error.ScheduleError +import uk.sky.scheduler.kafka.avro.{avroBinaryDeserializer, avroScheduleCodec, AvroSchedule} +import uk.sky.scheduler.kafka.json.{jsonDeserializer, JsonSchedule} import uk.sky.scheduler.message.Message trait EventSubscriber[F[_]] { def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]] } + +object EventSubscriber { + private type Output = Either[ScheduleError, Option[ScheduleEvent]] + + def kafka[F[_] : Async : Parallel : LoggerFactory]( + config: KafkaConfig, + loaded: Deferred[F, Unit] + ): F[EventSubscriber[F]] = { + + val avroConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] = { + given Resource[F, Deserializer[F, Either[ScheduleError, Option[AvroSchedule]]]] = + avroBinaryDeserializer[F, AvroSchedule].map(_.option.map(_.sequence)) + + config.consumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] + } + + val jsonConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] = { + given Deserializer[F, Either[ScheduleError, Option[JsonSchedule]]] = + jsonDeserializer[F, JsonSchedule].option.map(_.sequence) + + config.consumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] + } + + for { + avroLoadedRef <- Ref.of[F, Boolean](false) + jsonLoadedRef <- Ref.of[F, Boolean](false) + } yield new EventSubscriber[F] { + + /** If both topics have finished loading, complete the Deferred to allow Queueing schedules. + */ + private def onLoadCompare(exitCase: ExitCase): F[Unit] = + exitCase match { + case ExitCase.Succeeded => + for { + avroLoaded <- avroLoadedRef.get + jsonLoaded <- jsonLoadedRef.get + _ <- Async[F].whenA(avroLoaded && jsonLoaded)(loaded.complete(())) + } yield () + case ExitCase.Errored(_) | ExitCase.Canceled => Async[F].unit + } + + private val avroStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[AvroSchedule]]]] = + config.topics.avro.toNel + .fold(Stream.exec(avroLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))( + TopicLoader.loadAndRun(_, avroConsumerSettings) { exitCase => + avroLoadedRef.set(true) *> onLoadCompare(exitCase) + } + ) + + private val jsonStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule]]]] = + config.topics.json.toNel + .fold(Stream.exec(jsonLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))( + TopicLoader.loadAndRun(_, jsonConsumerSettings) { exitCase => + jsonLoadedRef.set(true) *> onLoadCompare(exitCase) + } + ) + + override def messages: Stream[F, Message[Output]] = + avroStream.merge(jsonStream).map(_.toMessage) + } + } + + def observed[F[_] : Monad : Parallel : LoggerFactory : Meter](delegate: EventSubscriber[F]): F[EventSubscriber[F]] = { + given Show[ScheduleError] = { + case _: ScheduleError.InvalidAvroError => "invalid-avro" + case _: ScheduleError.NotJsonError => "not-json" + case _: ScheduleError.InvalidJsonError => "invalid-json" + case _: ScheduleError.DecodeError => "decode" + case _: ScheduleError.TransformationError => "transformation" + } + + def updateAttributes(source: String) = Attributes( + Attribute("message.type", "update"), + Attribute("message.source", source) + ) + + def deleteAttributes(source: String, deleteType: String) = Attributes( + Attribute("message.type", "delete"), + Attribute("message.source", source), + Attribute("message.delete.type", deleteType) + ) + + def errorAttributes(source: String, error: ScheduleError) = Attributes( + Attribute("message.type", "error"), + Attribute("message.source", source), + Attribute("message.error.type", error.show) + ) + + for { + counter <- Meter[F].counter[Long]("event-subscriber").create + logger <- LoggerFactory[F].create + } yield new EventSubscriber[F] { + override def messages: Stream[F, Message[Output]] = + delegate.messages.evalTapChunk { case Message(key, source, value, metadata) => + val logCtx = Map("key" -> key, "source" -> source) + + value match { + case Right(Some(_)) => + logger.info(logCtx)(show"Decoded UPDATE for [$key] from $source") &> + counter.inc(updateAttributes(source)) + + case Right(None) => + lazy val deleteType = metadata.isExpired.fold("expired", "canceled") + logger.info(logCtx)(show"Decoded DELETE type=[$deleteType] for [$key] from $source") &> + counter.inc(deleteAttributes(source, deleteType)) + + case Left(error) => + logger.error(logCtx, error)(show"Error decoding [$key] from $source") &> + counter.inc(errorAttributes(source, error)) + } + } + } + } + + def live[F[_] : Async : Parallel : LoggerFactory : Meter]( + config: KafkaConfig, + loaded: Deferred[F, Unit] + ): F[EventSubscriber[F]] = + kafka[F](config, loaded).flatMap(observed) +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala index 68c264f5..30667e0a 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala @@ -3,6 +3,8 @@ package uk.sky.scheduler import cats.Parallel import cats.data.Reader import cats.effect.* +import cats.effect.syntax.all.* +import cats.effect.{Async, Concurrent, Deferred, Resource} import fs2.Stream import org.typelevel.log4cats.LoggerFactory import org.typelevel.otel4s.metrics.Meter @@ -33,9 +35,10 @@ object Scheduler { def live[F[_] : Async : Parallel : LoggerFactory : Meter]: Reader[Config, Resource[F, Scheduler[F, Unit]]] = Reader { config => for { - eventSubscriber <- Resource.pure(??? : EventSubscriber[F]) - scheduleQueue <- Resource.pure(??? : ScheduleQueue[F]) - schedulePublisher = SchedulePublisher.live[F](config.kafka) - } yield Scheduler[F, Unit](eventSubscriber, scheduleQueue, schedulePublisher) + allowEnqueue <- Deferred[F, Unit].toResource + eventSubscriber <- EventSubscriber.live[F](config.kafka, allowEnqueue).toResource + scheduleQueue <- Resource.pure(??? : ScheduleQueue[F]) + schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit]) + } yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher) } } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala index 20fd6b30..ad3cb00c 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala @@ -18,6 +18,7 @@ object Config { } final case class KafkaConfig( + topics: TopicConfig, consumer: ConsumerProducerConfig, producer: ConsumerProducerConfig, commit: CommitConfig @@ -62,3 +63,15 @@ final case class ReaderConfig( scheduleTopics: List[String], kafkaBrokers: String ) derives ConfigReader + +final case class TopicConfig(avro: List[String], json: List[String]) + +object TopicConfig { + given topicConfigReader: ConfigReader[TopicConfig] = + ConfigReader + .forProduct2[TopicConfig, List[String], List[String]]("avro", "json")(TopicConfig.apply) + .ensure( + config => config.avro.nonEmpty || config.json.nonEmpty, + message = _ => "both Avro and JSON topics were empty" + ) +} diff --git a/scheduler-3/src/test/scala/uk/sky/scheduler/converters/ScheduleEventConverterSpec.scala b/scheduler-3/src/test/scala/uk/sky/scheduler/converters/ScheduleEventConverterSpec.scala index 2b43165e..6654d60e 100644 --- a/scheduler-3/src/test/scala/uk/sky/scheduler/converters/ScheduleEventConverterSpec.scala +++ b/scheduler-3/src/test/scala/uk/sky/scheduler/converters/ScheduleEventConverterSpec.scala @@ -58,5 +58,4 @@ class ScheduleEventConverterSpec scheduleEvent.toTombstone should equalProducerRecord(tombstone) } } - }