diff --git a/.gitignore b/.gitignore index 45770fc1..ea73961d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,45 @@ -# SBT +# bloop and metals +.bloop +.bsp + +# metals +project/metals.sbt +.metals + +# vs code +.vscode + +# scala 3 +.tasty + +# sbt +project/project/ +project/target/ target/ -# IDE +# eclipse +build/ +.classpath +.project +.settings +.worksheet +bin/ +.cache + +# intellij idea +*.log +*.iml +*.ipr +*.iws .idea -.pairs +# mac +.DS_Store + +# other? +.history +.scala_dependencies +.cache-main + +#general +*.class diff --git a/.scalafix.conf b/.scalafix.conf new file mode 100644 index 00000000..63629caa --- /dev/null +++ b/.scalafix.conf @@ -0,0 +1,21 @@ +rules = [ + RemoveUnused, + NoAutoTupling, + DisableSyntax, + LeakingImplicitClassVal, + NoValInForComprehension, + ProcedureSyntax +] + +OrganizeImports { + coalesceToWildcardImportThreshold = 8 + expandRelative = true + groupedImports = Merge + importSelectorsOrder = SymbolsFirst + removeUnused = false + groups = [ + "java." + "*" + "scala." + ] +} diff --git a/.scalafmt.conf b/.scalafmt.conf index 3857c45b..431d34fd 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,15 +1,11 @@ -style = defaultWithAlign - -align = most - -maxColumn = 120 +version = 3.4.3 +align.preset = most includeCurlyBraceInSelectChains = false - +maxColumn = 120 newlines.sometimesBeforeColonInMethodReturnType = false - -rewrite.rules = [SortImports, RedundantBraces, RedundantParens, PreferCurlyFors] - project.git = true - -spaces.beforeContextBoundColon = true +rewrite.rules = [SortImports, RedundantBraces, RedundantParens, PreferCurlyFors] +runner.dialect = scala213 +spaces.beforeContextBoundColon = Always +style = defaultWithAlign diff --git a/.travis.yml b/.travis.yml index d4390a54..b09b90e0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: scala scala: - - 2.12.10 + - 2.13.8 jdk: - - openjdk12 + - openjdk17 # Avoid triggering a duplicate build for PRs branches: @@ -13,11 +13,14 @@ branches: # https://www.scala-sbt.org/1.x/docs/Travis-CI-with-sbt.html cache: directories: + - $HOME/.cache/coursier - $HOME/.ivy2/cache - - $HOME/.sbt/boot/ + - $HOME/.sbt + before_cache: - - find $HOME/.ivy2 -name "ivydata-*.properties" -delete - - find $HOME/.sbt -name "*.lock" -delete + - rm -fv $HOME/.ivy2/.sbt.ivy.lock + - find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete + - find $HOME/.sbt -name "*.lock" -print -delete script: - sbt ++$TRAVIS_SCALA_VERSION ciBuild diff --git a/LICENSE b/LICENSE index 3d36aca4..eb31e348 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2017-2019, Sky CP Ltd +Copyright (c) 2017-2022, Sky CP Ltd All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/build.sbt b/build.sbt index 33d5c8b9..b826d5a9 100644 --- a/build.sbt +++ b/build.sbt @@ -2,110 +2,68 @@ import com.typesafe.sbt.packager.docker.Cmd import Aliases._ import Release._ -scalafmtVersion in ThisBuild := "1.2.0" -scalafmtOnCompile in ThisBuild := true +ThisBuild / scalafmtOnCompile := true +ThisBuild / semanticdbEnabled := true +ThisBuild / semanticdbVersion := scalafixSemanticdb.revision +ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0" -val kafkaVersion = "2.6.0" -val akkaVersion = "2.5.23" -val catsVersion = "1.6.1" -val refinedVersion = "0.9.8" -val pureConfigVersion = "0.11.1" -val kamonVersion = "2.4.2" +Global / onChangedBuildSource := ReloadOnSourceChanges -val dependencies = Seq( - "uk.sky" %% "kafka-topic-loader" % "1.5.3", - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, - "com.typesafe.akka" %% "akka-stream-kafka" % "1.0.4", - "com.typesafe.akka" %% "akka-stream-contrib" % "0.10", - "io.monix" %% "monix-execution" % "3.0.0-RC3", - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", - "org.typelevel" %% "cats-core" % catsVersion, - "org.typelevel" %% "cats-testkit" % catsVersion, - "ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime, - "net.logstash.logback" % "logstash-logback-encoder" % "6.1" % Runtime, - "org.codehaus.janino" % "janino" % "3.0.13" % Runtime, - "com.github.pureconfig" %% "pureconfig" % pureConfigVersion, - "com.github.pureconfig" %% "pureconfig-cats" % pureConfigVersion, - "io.kamon" %% "kamon-core" % kamonVersion, - "io.kamon" %% "kamon-akka" % kamonVersion, - "io.kamon" %% "kamon-prometheus" % kamonVersion, - "eu.timepit" %% "refined" % refinedVersion, - "eu.timepit" %% "refined-pureconfig" % refinedVersion, - "eu.timepit" %% "refined-scalacheck" % refinedVersion, - "org.apache.kafka" %% "kafka" % kafkaVersion % Test, - "org.scalatest" %% "scalatest" % "3.0.8" % Test, - "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, - "com.danielasfregola" %% "random-data-generator" % "2.7" % Test, - "com.47deg" %% "scalacheck-toolbox-datetime" % "0.2.5" % Test, - "org.mockito" % "mockito-all" % "1.10.19" % Test, - "org.zalando" %% "grafter" % "2.6.1" % Test, - "io.github.embeddedkafka" %% "embedded-kafka" % "2.6.0" % Test -) +Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF") val commonSettings = Seq( organization := "com.sky", - scalaVersion := "2.12.10", - libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "2.0.2" + scalaVersion := "2.13.8" +) + +val compilerSettings = Seq( + // Compiler option not provided by sbt-tpolecat + scalacOptions += "-Ymacro-annotations", + tpolecatScalacOptions ~= { opts => + opts.filterNot(Set(ScalacOptions.warnValueDiscard)) + } ) lazy val dockerSettings = Seq( - packageName in Docker := "kafka-message-scheduler", - dockerBaseImage := "alpine:3.13.2", - dockerRepository := Some("skyuk"), - dockerLabels := Map("maintainer" -> "Sky"), - dockerUpdateLatest := true, + Docker / packageName := "kafka-message-scheduler", + dockerBaseImage := "eclipse-temurin:17-jdk-alpine", + dockerRepository := Some("skyuk"), + dockerLabels := Map("maintainer" -> "Sky"), + dockerUpdateLatest := true, dockerCommands ++= Seq( Cmd("USER", "root"), - Cmd("RUN", "apk add --no-cache bash eudev openjdk11-jre") + Cmd("RUN", "apk add --no-cache bash") ) ) val buildInfoSettings = Seq( - buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion), + buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion), buildInfoPackage := "com.sky" ) lazy val scheduler = (project in file("scheduler")) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin) .settings(commonSettings) + .settings(compilerSettings) .settings( - libraryDependencies ++= dependencies, - dependencyOverrides ++= Seq("org.apache.kafka" % "kafka-clients" % kafkaVersion, - "org.scalacheck" %% "scalacheck" % "1.13.5"), - resolvers ++= Seq( - "jitpack" at "https://jitpack.io", - Resolver.bintrayRepo("cakesolutions", "maven") - ), - addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full), - addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.10"), - scalacOptions ++= Seq( - "-language:implicitConversions", - "-language:postfixOps", - "-Xfatal-warnings", - "-Ywarn-dead-code", - "-Ywarn-unused", - "-deprecation", - "-Ypartial-unification", - "-encoding", - "utf-8" - ), - fork in run := true, - fork in Test := true, - javaAgents += "io.kamon" % "kanela-agent" % "1.0.7", + libraryDependencies ++= Dependencies.all, + addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full), + run / fork := true, + Test / fork := true, + javaAgents += "io.kamon" % "kanela-agent" % "1.0.14", buildInfoSettings, dockerSettings, releaseSettings, - parallelExecution in Test := false + Test / parallelExecution := false ) val schema = inputKey[Unit]("Generate the Avro schema file for the Schedule schema.") lazy val avro = (project in file("avro")) .settings(commonSettings) - .settings(schema := (run in Compile).toTask("").value) + .settings(compilerSettings) + .settings(libraryDependencies += Dependencies.avro4s) + .settings(schema := (Compile / run).toTask("").value) .dependsOn(scheduler % "compile->compile") .disablePlugins(ReleasePlugin) diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 6c0c4033..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,41 +0,0 @@ -version: '3.3' -services: - - kafka: - image: confluentinc/cp-kafka:3.3.0 - ports: - - "9092:9092" - depends_on: - - zookeeper - environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092' - - zookeeper: - image: confluentinc/cp-zookeeper:3.3.0 - environment: - ZOOKEEPER_CLIENT_PORT: '2181' - - schema-registry: - image: confluentinc/cp-schema-registry:3.3.0 - hostname: schema-registry - depends_on: - - kafka - - zookeeper - ports: - - "8082:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181 - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8082 - - kms: - image: skyuk/kafka-message-scheduler - depends_on: - - kafka - environment: - KAFKA_BROKERS: kafka:9092 - SCHEDULE_TOPIC: schedules - ports: - - "9095:9095" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 44bbb3a2..f531aac0 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,38 +1,40 @@ -version: '3.3' -services: - - zookeeper: - image: confluentinc/cp-zookeeper:3.3.0 - environment: - ZOOKEEPER_CLIENT_PORT: '2181' - - kafka: - image: confluentinc/cp-kafka:3.3.0 - ports: - - "9093:9093" - depends_on: - - zookeeper - environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,OUTSIDE://localhost:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT - - kms: - image: skyuk/kafka-message-scheduler:latest - depends_on: - - kafka - environment: - KAFKA_BROKERS: kafka:9092 - SCHEDULE_TOPICS: schedules - ports: - - "9095:9095" - - prometheus: - image: prom/prometheus:v2.25.2 - ports: - - "9090:9090" - volumes: - - ./prometheus.yml:/etc/prometheus/prometheus.yml - command: - - '--config.file=/etc/prometheus/prometheus.yml' +version: '3.8' + +services: + + kms: + image: skyuk/kafka-message-scheduler:latest + depends_on: + - kafka + - zookeeper + environment: + KAFKA_BROKERS: kafka:9092 + SCHEDULE_TOPICS: schedules + ports: + - "9095:9095" + + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + environment: + ZOOKEEPER_CLIENT_PORT: '2181' + + kafka: + image: confluentinc/cp-kafka:7.0.1 + ports: + - "9093:9093" + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,OUTSIDE://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + + prometheus: + image: prom/prometheus:v2.33.4 + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' diff --git a/project/Aliases.scala b/project/Aliases.scala index 318880b9..160ce5a8 100644 --- a/project/Aliases.scala +++ b/project/Aliases.scala @@ -2,10 +2,12 @@ import sbt._ object Aliases { - lazy val defineCommandAliases = { - addCommandAlias("ciBuild", ";clean; test; schema") ++ - addCommandAlias("ciRelease", ";clean; schema; project scheduler; release with-defaults") ++ - addCommandAlias("checkFmt", ";scalafmt::test; test:scalafmt::test; sbt:scalafmt::test") ++ - addCommandAlias("runFmt", ";scalafmt; test:scalafmt; sbt:scalafmt") - } + lazy val defineCommandAliases = + addCommandAlias("ciBuild", "checkFmt; checkFix; test; schema") ++ + addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults") ++ + addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check") ++ + addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll") ++ + addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck") ++ + addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt") + } diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 00000000..5791afb1 --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,96 @@ +import sbt._ + +object Dependencies { + + object Akka { + private val version = "2.6.19" + val actor = "com.typesafe.akka" %% "akka-actor" % version + val stream = "com.typesafe.akka" %% "akka-stream" % version + val streamKafka = "com.typesafe.akka" %% "akka-stream-kafka" % "3.0.0" + val slf4j = "com.typesafe.akka" %% "akka-slf4j" % version + val testKit = "com.typesafe.akka" %% "akka-testkit" % version % Test + val streamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % version % Test + val base = Seq(actor, stream, streamKafka, slf4j) + val test = Seq(testKit, streamTestKit) + } + + object Cats { + private val version = "2.7.0" + val core = "org.typelevel" %% "cats-core" % version + val testKit = "org.typelevel" %% "cats-testkit" % version % Test + val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test + val base = Seq(core) + val test = Seq(testKit, scalatest) + } + + object Kafka { + private val version = "3.1.0" + val kafkaClients = "org.apache.kafka" % "kafka-clients" % version + val kafka = "org.apache.kafka" %% "kafka" % version % Test + val base = Seq(kafkaClients) + val test = Seq(kafka) + } + + object Kamon { + private val version = "2.5.1" + val core = "io.kamon" %% "kamon-core" % version + val akka = "io.kamon" %% "kamon-akka" % version + val prometheus = "io.kamon" %% "kamon-prometheus" % version + val all = Seq(core, akka, prometheus) + } + + object PureConfig { + private val version = "0.17.1" + val pureconfig = "com.github.pureconfig" %% "pureconfig" % version + val cats = "com.github.pureconfig" %% "pureconfig-cats" % version + val all = Seq(pureconfig, cats) + } + + object Refined { + private val version = "0.9.28" + val refined = "eu.timepit" %% "refined" % version + val pureconfig = "eu.timepit" %% "refined-pureconfig" % version + val scalaCheck = "eu.timepit" %% "refined-scalacheck" % version % Test + val base = Seq(refined, pureconfig) + val test = Seq(scalaCheck) + } + + val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.12" + val kafkaTopicLoader = "uk.sky" %% "kafka-topic-loader" % "1.5.6" + val monix = "io.monix" %% "monix-execution" % "3.4.0" + val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.4" + + val janino = "org.codehaus.janino" % "janino" % "3.1.6" % Runtime + val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.11" % Runtime + val logbackEncoder = "net.logstash.logback" % "logstash-logback-encoder" % "7.0.1" % Runtime + + val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.1.0" % Test + val mockito = "org.mockito" % "mockito-core" % "4.4.0" % Test + val randomDataGenerator = "com.danielasfregola" %% "random-data-generator" % "2.9" % Test + val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.15.4" % Test + val scalaCheckDatetime = "com.47deg" %% "scalacheck-toolbox-datetime" % "0.6.0" % Test + val scalaTest = "org.scalatest" %% "scalatest" % "3.2.11" % Test + val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % Test + + val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq( + avro4s, + kafkaTopicLoader, + monix, + scalaLogging + ) + val runtime: Seq[ModuleID] = Seq( + janino, + logbackClassic, + logbackEncoder + ) + val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq( + embeddedKafka, + mockito, + randomDataGenerator, + scalaCheck, + scalaCheckDatetime, + scalaTest, + scalaTestPlusMockito + ) + val all: Seq[sbt.ModuleID] = core ++ runtime ++ test +} diff --git a/project/Release.scala b/project/Release.scala index 691676fd..bf0a3d3f 100644 --- a/project/Release.scala +++ b/project/Release.scala @@ -9,15 +9,15 @@ import sbtrelease._ object Release { // Useful tasks to show what versions would be used if a release was performed. private val showReleaseVersion = taskKey[String]("the future version once releaseNextVersion has been applied to it") - private val showNextVersion = taskKey[String]("the future version once releaseNextVersion has been applied to it") + private val showNextVersion = taskKey[String]("the future version once releaseNextVersion has been applied to it") lazy val releaseSettings = Seq( - releaseUseGlobalVersion := false, - releaseVersionBump := sbtrelease.Version.Bump.Minor, - releaseTagName := s"${name.value}-${version.value}", - releaseTagComment := s"Releasing ${version.value} of module: ${name.value}", - releasePublishArtifactsAction:= (publish in Universal).value, - releaseProcess := Seq[ReleaseStep]( + releaseUseGlobalVersion := false, + releaseVersionBump := sbtrelease.Version.Bump.Minor, + releaseTagName := s"${name.value}-${version.value}", + releaseTagComment := s"Releasing ${version.value} of module: ${name.value}", + releasePublishArtifactsAction := (Universal / publish).value, + releaseProcess := Seq[ReleaseStep]( runClean, checkSnapshotDependencies, releaseStepCommand(ExtraReleaseCommands.initialVcsChecksCommand), @@ -26,12 +26,12 @@ object Release { runTest, commitReleaseVersion, tagRelease, - ReleaseStep(releaseStepTask(publish in docker)), + ReleaseStep(releaseStepTask(docker / publish)), setNextVersion, commitNextVersion, pushChanges ), - showReleaseVersion := { val rV = releaseVersion.value.apply(version.value); println(rV); rV }, - showNextVersion := { val nV = releaseNextVersion.value.apply(version.value); println(nV); nV } + showReleaseVersion := { val rV = releaseVersion.value.apply(version.value); println(rV); rV }, + showNextVersion := { val nV = releaseNextVersion.value.apply(version.value); println(nV); nV } ) } diff --git a/project/build.properties b/project/build.properties index 92debe03..4ff6415f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.1 \ No newline at end of file +sbt.version=1.6.2 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 9418e4c8..4e7c8e7f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,9 @@ -addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0") -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.9") -addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.6") -addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.10") -addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.34") -addSbtPlugin("com.lucidchart" % "sbt-scalafmt" % "1.15") -addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.9") +addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13") +addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.6") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.1") +addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.35") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.34") +addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.12") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.3.0") diff --git a/scheduler/lib/kamon-jmx-collector_2.12-1.0.0-RC1.jar b/scheduler/lib/kamon-jmx-collector_2.12-1.0.0-RC1.jar deleted file mode 100644 index 894ff31a..00000000 Binary files a/scheduler/lib/kamon-jmx-collector_2.12-1.0.0-RC1.jar and /dev/null differ diff --git a/scheduler/lib/kamon-jmx-collector_2.13-1.0.0-RC1.jar b/scheduler/lib/kamon-jmx-collector_2.13-1.0.0-RC1.jar new file mode 100644 index 00000000..5d5f9983 Binary files /dev/null and b/scheduler/lib/kamon-jmx-collector_2.13-1.0.0-RC1.jar differ diff --git a/scheduler/src/main/resources/application.conf b/scheduler/src/main/resources/application.conf index b644a6f5..58dc98a7 100644 --- a/scheduler/src/main/resources/application.conf +++ b/scheduler/src/main/resources/application.conf @@ -52,6 +52,9 @@ akka { kamon { environment.service = "kafka-message-scheduler" + + host-metrics.enabled = no + instrumentation.akka.filters.actors.track { includes = ["kafka-message-scheduler/user/scheduling-actor", "kafka-message-scheduler/user/publisher-actor"] excludes = ["kafka-message-scheduler/system/**"] diff --git a/scheduler/src/main/scala/com/sky/kms/AkkaComponents.scala b/scheduler/src/main/scala/com/sky/kms/AkkaComponents.scala index 7c8d83a0..b6edaeef 100644 --- a/scheduler/src/main/scala/com/sky/kms/AkkaComponents.scala +++ b/scheduler/src/main/scala/com/sky/kms/AkkaComponents.scala @@ -1,17 +1,10 @@ package com.sky.kms import akka.actor.ActorSystem -import akka.stream.Supervision.Stop -import akka.stream.{ActorMaterializer, ActorMaterializerSettings} import com.typesafe.scalalogging.LazyLogging trait AkkaComponents extends LazyLogging { - implicit lazy val system = ActorSystem("kafka-message-scheduler") - - implicit lazy val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(t => { - logger.error("Exception caught by stream supervisor", t) - Stop - })) + implicit lazy val system: ActorSystem = ActorSystem("kafka-message-scheduler") } diff --git a/scheduler/src/main/scala/com/sky/kms/Main.scala b/scheduler/src/main/scala/com/sky/kms/Main.scala index dfe91e21..1f31fa12 100644 --- a/scheduler/src/main/scala/com/sky/kms/Main.scala +++ b/scheduler/src/main/scala/com/sky/kms/Main.scala @@ -1,23 +1,25 @@ package com.sky.kms +import cats.implicits.toShow import com.sky.BuildInfo import com.sky.kms.config.AppConfig import com.typesafe.scalalogging.LazyLogging import eu.timepit.refined.pureconfig._ +import pureconfig.ConfigSource.default.loadOrThrow import pureconfig.generic.auto._ -import pureconfig.loadConfigOrThrow import pureconfig.module.cats._ object Main extends App with LazyLogging with AkkaComponents { logger.info(s"Kafka Message Scheduler ${BuildInfo.name} ${BuildInfo.version} starting up...") - val conf = loadConfigOrThrow[AppConfig] + val conf: AppConfig = loadOrThrow[AppConfig] - SchedulerApp.metricsInit + logger.info(s"Loaded Config ${conf.show}") - val app = SchedulerApp.configure apply conf + SchedulerApp.metricsInit - val runningApp = SchedulerApp.run apply app + val app = SchedulerApp.configure.apply(conf) + val runningApp = SchedulerApp.run.apply(app) logger.info("Kafka Message Scheduler initialisation complete.") } diff --git a/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala b/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala index f22e0a1f..de65e2f1 100644 --- a/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala +++ b/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala @@ -2,7 +2,6 @@ package com.sky.kms import akka.actor.{ActorRef, ActorSystem} import akka.kafka.scaladsl.Consumer.Control -import akka.stream.ActorMaterializer import com.sky.kms.actors._ import com.sky.kms.config.Configured import com.sky.kms.streams.{ScheduleReader, ScheduledMessagePublisher} @@ -11,9 +10,11 @@ import kamon.jmx.collector.KamonJmxMetricCollector import scala.concurrent.Future -case class SchedulerApp(reader: ScheduleReader[Future[Control]], - publisher: ScheduledMessagePublisher, - publisherActor: ActorRef) +case class SchedulerApp( + reader: ScheduleReader[Future[Control]], + publisher: ScheduledMessagePublisher, + publisherActor: ActorRef +) object SchedulerApp { @@ -30,13 +31,13 @@ object SchedulerApp { } yield SchedulerApp(scheduleReader, publisher, publisherActor) } - def run(implicit system: ActorSystem, mat: ActorMaterializer): Start[Running] = + def run(implicit system: ActorSystem): Start[Running] = for { publisher <- ScheduledMessagePublisher.run _ <- PublisherActor.init(publisher.materializedSource) runningReader <- ScheduleReader.run - running = Running(runningReader, publisher) - _ = ShutdownTasks.forScheduler(running) + running = Running(runningReader, publisher) + _ = ShutdownTasks.forScheduler(running) } yield running def metricsInit(implicit system: ActorSystem): Unit = { diff --git a/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala b/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala index 2e267e94..dc23171a 100644 --- a/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala +++ b/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala @@ -17,29 +17,26 @@ class PublisherActor extends Actor with ActorLogging { override def receive: Receive = waitForQueue - private def waitForQueue: Receive = { - case Init(queue) => - queue.watchCompletion().failed.foreach(t => self ! DownstreamFailure(t)) - context become (receiveWithQueue(queue) orElse stopOnError) + private def waitForQueue: Receive = { case Init(queue) => + queue.watchCompletion().failed.foreach(t => self ! DownstreamFailure(t)) + context become (receiveWithQueue(queue) orElse stopOnError) } - private def receiveWithQueue(queue: ScheduleQueue): Receive = { - case Trigger(scheduleId, schedule) => - queue.offer((scheduleId, messageFrom(schedule))) onComplete { - case Success(QueueOfferResult.Enqueued) => - log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show) - case Success(res) => - log.warning(ScheduleQueueOfferResult(scheduleId, res).show) - case Failure(t) => - log.error(t, s"Failed to enqueue $scheduleId") - self ! DownstreamFailure(t) - } + private def receiveWithQueue(queue: ScheduleQueue): Receive = { case Trigger(scheduleId, schedule) => + queue.offer((scheduleId, messageFrom(schedule))) onComplete { + case Success(QueueOfferResult.Enqueued) => + log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show) + case Success(res) => + log.warning(ScheduleQueueOfferResult(scheduleId, res).show) + case Failure(t) => + log.error(t, s"Failed to enqueue $scheduleId") + self ! DownstreamFailure(t) + } } - private def stopOnError: Receive = { - case DownstreamFailure(t) => - log.error(t, "Publisher stream has died") - context stop self + private def stopOnError: Receive = { case DownstreamFailure(t) => + log.error(t, "Publisher stream has died") + context stop self } private def messageFrom(schedule: ScheduleEvent) = @@ -57,7 +54,7 @@ object PublisherActor { case class DownstreamFailure(t: Throwable) def create(implicit system: ActorSystem): ActorRef = - system.actorOf(Props[PublisherActor], "publisher-actor") + system.actorOf(Props[PublisherActor](), "publisher-actor") def init(queue: ScheduleQueue): Start[Unit] = Start(_.publisherActor ! Init(queue)) diff --git a/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala b/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala index 254a455d..c9e69fbe 100644 --- a/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala +++ b/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala @@ -26,20 +26,18 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit schedules -= scheduleId } - val finishInitialisation: Receive = { - case Initialised => - log.debug("State initialised - scheduling stored schedules") - val scheduled = schedules.map { - case (scheduleId, schedule) => - monitoring.scheduleReceived() - scheduleId -> scheduleOnce(scheduleId, schedule) - } - log.info("Reloaded state has been scheduled") - context become receiveWithSchedules(scheduled) + val finishInitialisation: Receive = { case Initialised => + log.debug("State initialised - scheduling stored schedules") + val scheduled = schedules.map { case (scheduleId, schedule) => + monitoring.scheduleReceived() + scheduleId -> scheduleOnce(scheduleId, schedule) + } + log.info("Reloaded state has been scheduled") + context become receiveWithSchedules(scheduled) } streamStartedOrFailed orElse { - (handleSchedulingMessage orElse finishInitialisation) andThen (_ => sender ! Ack) + (handleSchedulingMessage orElse finishInitialisation) andThen (_ => sender() ! Ack) } } @@ -50,7 +48,8 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit scheduled.get(scheduleId).foreach(_.cancel()) val cancellable = scheduleOnce(scheduleId, schedule) log.info( - s"Scheduled $scheduleId from ${schedule.inputTopic} to ${schedule.outputTopic} in ${schedule.delay.toMillis} millis") + s"Scheduled $scheduleId from ${schedule.inputTopic} to ${schedule.outputTopic} in ${schedule.delay.toMillis} millis" + ) monitoring.scheduleReceived() scheduled += (scheduleId -> cancellable) @@ -65,7 +64,7 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit } streamStartedOrFailed orElse { - handleSchedulingMessage andThen (_ => sender ! Ack) + handleSchedulingMessage andThen (_ => sender() ! Ack) } } @@ -78,8 +77,8 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit case UpstreamFailure(t) => log.error(t, "Reader stream has died") context stop self - case StreamStarted => - sender ! Ack + case StreamStarted => + sender() ! Ack } } @@ -100,6 +99,8 @@ object SchedulingActor { case class UpstreamFailure(t: Throwable) def create(publisherActor: ActorRef)(implicit system: ActorSystem): ActorRef = - system.actorOf(Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), new KamonMonitoring())), - "scheduling-actor") + system.actorOf( + Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), new KamonMonitoring())), + "scheduling-actor" + ) } diff --git a/scheduler/src/main/scala/com/sky/kms/actors/TerminatorActor.scala b/scheduler/src/main/scala/com/sky/kms/actors/TerminatorActor.scala index 9ecfdb5f..8d93b790 100644 --- a/scheduler/src/main/scala/com/sky/kms/actors/TerminatorActor.scala +++ b/scheduler/src/main/scala/com/sky/kms/actors/TerminatorActor.scala @@ -10,14 +10,13 @@ import scala.concurrent.Future class TerminatorActor(terminate: Eval[Future[Done]], actorsToWatch: ActorRef*) extends Actor with ActorLogging { override def preStart(): Unit = - actorsToWatch foreach (context watch) - - override def receive: Receive = { - case Terminated(ref) => - log.error(s"$ref stopped. Shutting down") - actorsToWatch.filterNot(_ == ref).foreach(context stop) - terminate.value - context stop self + actorsToWatch foreach (context.watch) + + override def receive: Receive = { case Terminated(ref) => + log.error(s"$ref stopped. Shutting down") + actorsToWatch.filterNot(_ == ref).foreach(context.stop) + terminate.value + context.stop(self) } } @@ -27,5 +26,6 @@ object TerminatorActor { def create(actors: ActorRef*)(implicit system: ActorSystem): ActorRef = system.actorOf( - Props(new TerminatorActor(Eval.later(CoordinatedShutdown(system).run(StreamActorsTerminated)), actors: _*))) + Props(new TerminatorActor(Eval.later(CoordinatedShutdown(system).run(StreamActorsTerminated)), actors: _*)) + ) } diff --git a/scheduler/src/main/scala/com/sky/kms/avro/package.scala b/scheduler/src/main/scala/com/sky/kms/avro/package.scala index ef58974a..dacc3d0b 100644 --- a/scheduler/src/main/scala/com/sky/kms/avro/package.scala +++ b/scheduler/src/main/scala/com/sky/kms/avro/package.scala @@ -7,16 +7,19 @@ import org.apache.avro.Schema package object avro { - implicit object DateTimeSchemaFor extends SchemaFor[OffsetDateTime] { - override val schema: Schema = Schema.create(Schema.Type.LONG) - } + implicit val dateTimeSchemaFor = SchemaFor[OffsetDateTime](Schema.create(Schema.Type.LONG)) implicit object DateTimeEncoder extends Encoder[OffsetDateTime] { - override def encode(value: OffsetDateTime, schema: Schema): java.lang.Long = value.toInstant.toEpochMilli + override def encode(value: OffsetDateTime): java.lang.Long = value.toInstant.toEpochMilli + + override def schemaFor: SchemaFor[OffsetDateTime] = SchemaFor[OffsetDateTime] } implicit object DateTimeDecoder extends Decoder[OffsetDateTime] { - override def decode(value: Any, schema: Schema): OffsetDateTime = + override def decode(value: Any): OffsetDateTime = Instant.ofEpochMilli(value.toString.toLong).atZone(ZoneOffset.UTC).toOffsetDateTime + + override def schemaFor: SchemaFor[OffsetDateTime] = SchemaFor[OffsetDateTime] } + } diff --git a/scheduler/src/main/scala/com/sky/kms/config/config.scala b/scheduler/src/main/scala/com/sky/kms/config/config.scala index 78aa9a89..8044e1c5 100644 --- a/scheduler/src/main/scala/com/sky/kms/config/config.scala +++ b/scheduler/src/main/scala/com/sky/kms/config/config.scala @@ -1,5 +1,6 @@ package com.sky.kms.config +import cats.Show import cats.data.{NonEmptyList, Reader} import com.sky.kms.kafka.Topic @@ -7,6 +8,10 @@ import scala.concurrent.duration.FiniteDuration case class AppConfig(scheduler: SchedulerConfig) +object AppConfig { + implicit val show: Show[AppConfig] = Show.show(_.toString) +} + case class SchedulerConfig(reader: ReaderConfig, publisher: PublisherConfig) object SchedulerConfig { diff --git a/scheduler/src/main/scala/com/sky/kms/domain/ApplicationError.scala b/scheduler/src/main/scala/com/sky/kms/domain/ApplicationError.scala index eabef705..92687f5d 100644 --- a/scheduler/src/main/scala/com/sky/kms/domain/ApplicationError.scala +++ b/scheduler/src/main/scala/com/sky/kms/domain/ApplicationError.scala @@ -22,10 +22,10 @@ object ApplicationError extends LazyLogging { final case class InvalidTimeError(key: String, time: OffsetDateTime) extends ApplicationError implicit val showError: Show[ApplicationError] = show { - case error: InvalidSchemaError => s"Invalid schema used to produce message with key ${error.key}" + case error: InvalidSchemaError => s"Invalid schema used to produce message with key ${error.key}" case error: AvroMessageFormatError => s"Error when processing message with key ${error.key}. ${error.cause.getMessage}" - case error: InvalidTimeError => s"Time between now and ${error.time} is not within 292 years" + case error: InvalidTimeError => s"Time between now and ${error.time} is not within 292 years" } def extractError[T]: Flow[Either[ApplicationError, T], ApplicationError, NotUsed] = diff --git a/scheduler/src/main/scala/com/sky/kms/domain/PublishableMessage.scala b/scheduler/src/main/scala/com/sky/kms/domain/PublishableMessage.scala index c7f62525..3af4bfdf 100644 --- a/scheduler/src/main/scala/com/sky/kms/domain/PublishableMessage.scala +++ b/scheduler/src/main/scala/com/sky/kms/domain/PublishableMessage.scala @@ -4,12 +4,13 @@ sealed trait PublishableMessage extends Product with Serializable object PublishableMessage { - final case class ScheduledMessage(inputTopic: String, - outputTopic: String, - key: Array[Byte], - value: Option[Array[Byte]], - headers: Map[String, Array[Byte]]) - extends PublishableMessage + final case class ScheduledMessage( + inputTopic: String, + outputTopic: String, + key: Array[Byte], + value: Option[Array[Byte]], + headers: Map[String, Array[Byte]] + ) extends PublishableMessage final case class ScheduleDeletion(scheduleId: ScheduleId, scheduleTopic: String, headers: Map[String, Array[Byte]]) extends PublishableMessage diff --git a/scheduler/src/main/scala/com/sky/kms/domain/Schedule.scala b/scheduler/src/main/scala/com/sky/kms/domain/Schedule.scala index 91ffeb87..3261d14a 100644 --- a/scheduler/src/main/scala/com/sky/kms/domain/Schedule.scala +++ b/scheduler/src/main/scala/com/sky/kms/domain/Schedule.scala @@ -1,6 +1,7 @@ package com.sky.kms.domain import java.time.OffsetDateTime + import scala.concurrent.duration.FiniteDuration sealed trait Schedule extends Product with Serializable @@ -10,16 +11,18 @@ object Schedule { final case class ScheduleNoHeaders(time: OffsetDateTime, topic: String, key: Array[Byte], value: Option[Array[Byte]]) extends Schedule - final case class ScheduleWithHeaders(time: OffsetDateTime, - topic: String, - key: Array[Byte], - value: Option[Array[Byte]], - headers: Map[String, Array[Byte]]) - extends Schedule - - implicit class ScheduleOps(val s: Schedule) extends AnyVal { - def getTime = Schedule.getTime(s) - def toScheduleEvent(delay: FiniteDuration, inputTopic: String) = Schedule.toScheduleEvent(delay, inputTopic, s) + final case class ScheduleWithHeaders( + time: OffsetDateTime, + topic: String, + key: Array[Byte], + value: Option[Array[Byte]], + headers: Map[String, Array[Byte]] + ) extends Schedule + + implicit class ScheduleOps(private val s: Schedule) extends AnyVal { + def getTime: OffsetDateTime = Schedule.getTime(s) + def toScheduleEvent(delay: FiniteDuration, inputTopic: String): ScheduleEvent = + Schedule.toScheduleEvent(delay, inputTopic, s) } private def getTime(schedule: Schedule): OffsetDateTime = schedule match { @@ -29,7 +32,7 @@ object Schedule { private def toScheduleEvent(delay: FiniteDuration, inputTopic: String, schedule: Schedule): ScheduleEvent = schedule match { - case ScheduleNoHeaders(_, outputTopic, key, value) => + case ScheduleNoHeaders(_, outputTopic, key, value) => ScheduleEvent(delay, inputTopic, outputTopic, key, value, Map.empty) case ScheduleWithHeaders(_, outputTopic, key, value, headers) => ScheduleEvent(delay, inputTopic, outputTopic, key, value, headers) diff --git a/scheduler/src/main/scala/com/sky/kms/domain/ScheduleEvent.scala b/scheduler/src/main/scala/com/sky/kms/domain/ScheduleEvent.scala new file mode 100644 index 00000000..99d8ddc0 --- /dev/null +++ b/scheduler/src/main/scala/com/sky/kms/domain/ScheduleEvent.scala @@ -0,0 +1,12 @@ +package com.sky.kms.domain + +import scala.concurrent.duration.FiniteDuration + +case class ScheduleEvent( + delay: FiniteDuration, + inputTopic: String, + outputTopic: String, + key: Array[Byte], + value: Option[Array[Byte]], + headers: Map[String, Array[Byte]] +) diff --git a/scheduler/src/main/scala/com/sky/kms/domain/ScheduleQueueOfferResult.scala b/scheduler/src/main/scala/com/sky/kms/domain/ScheduleQueueOfferResult.scala index e7c695e6..62980812 100644 --- a/scheduler/src/main/scala/com/sky/kms/domain/ScheduleQueueOfferResult.scala +++ b/scheduler/src/main/scala/com/sky/kms/domain/ScheduleQueueOfferResult.scala @@ -10,13 +10,13 @@ case class ScheduleQueueOfferResult(scheduleId: ScheduleId, queueOfferResult: Qu object ScheduleQueueOfferResult { implicit val queueOfferResultShow: Show[ScheduleQueueOfferResult] = show { - case ScheduleQueueOfferResult(scheduleId, Enqueued) => + case ScheduleQueueOfferResult(scheduleId, Enqueued) => s"$scheduleId enqueued successfully" - case ScheduleQueueOfferResult(scheduleId, Dropped) => + case ScheduleQueueOfferResult(scheduleId, Dropped) => s"$scheduleId was dropped from queue, check the chosen overflow strategy" case ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Failure(t)) => s"An error occurred when attempting to enqueue the $scheduleId: ${t.getMessage}" - case ScheduleQueueOfferResult(scheduleId, QueueClosed) => + case ScheduleQueueOfferResult(scheduleId, QueueClosed) => s"Unable to enqueue $scheduleId because the downstream source queue has been completed/closed" } diff --git a/scheduler/src/main/scala/com/sky/kms/domain/package.scala b/scheduler/src/main/scala/com/sky/kms/domain/package.scala index ed33e54e..37df08ba 100644 --- a/scheduler/src/main/scala/com/sky/kms/domain/package.scala +++ b/scheduler/src/main/scala/com/sky/kms/domain/package.scala @@ -5,21 +5,13 @@ import java.lang import org.apache.kafka.common.header.Header import org.apache.kafka.common.header.internals.RecordHeader -import scala.collection.JavaConverters._ -import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ package object domain { type ScheduleId = String - case class ScheduleEvent(delay: FiniteDuration, - inputTopic: String, - outputTopic: String, - key: Array[Byte], - value: Option[Array[Byte]], - headers: Map[String, Array[Byte]]) - - implicit class HeadersOps(val headers: Map[String, Array[Byte]]) extends AnyVal { + implicit class HeadersOps(private val headers: Map[String, Array[Byte]]) extends AnyVal { def asKafkaHeaders: lang.Iterable[Header] = headers.map { case (k, v) => new RecordHeader(k, v): Header }.asJava } diff --git a/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala b/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala index d48e8cda..8219b3ab 100644 --- a/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala +++ b/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala @@ -2,7 +2,7 @@ package com.sky.kms.monitoring trait Monitoring { - def scheduleReceived() + def scheduleReceived(): Unit - def scheduleDone() + def scheduleDone(): Unit } diff --git a/scheduler/src/main/scala/com/sky/kms/package.scala b/scheduler/src/main/scala/com/sky/kms/package.scala index 8b6f120d..179f7fcd 100644 --- a/scheduler/src/main/scala/com/sky/kms/package.scala +++ b/scheduler/src/main/scala/com/sky/kms/package.scala @@ -24,15 +24,17 @@ package object kms { scheduleEvent(cr, decoder[ScheduleWithHeaders]) .orElse(scheduleEvent(cr, decoder[ScheduleNoHeaders])) - private def scheduleEvent[A <: Schedule](cr: ConsumerRecord[String, Array[Byte]], - decode: Array[Byte] => Option[Try[A]]): ScheduleReader.In = + private def scheduleEvent[A <: Schedule]( + cr: ConsumerRecord[String, Array[Byte]], + decode: Array[Byte] => Option[Try[A]] + ): ScheduleReader.In = Option(cr.value).fold[ScheduleReader.In]((cr.key, None).asRight[ApplicationError]) { bytes => for { scheduleTry <- Either.fromOption(decode(bytes), InvalidSchemaError(cr.key)) avroSchedule <- scheduleTry.toEither.leftMap(AvroMessageFormatError(cr.key, _)) - delay <- Either - .catchNonFatal(MILLIS.between(OffsetDateTime.now, avroSchedule.getTime).millis) - .leftMap(_ => InvalidTimeError(cr.key, avroSchedule.getTime)) + delay <- Either + .catchNonFatal(MILLIS.between(OffsetDateTime.now, avroSchedule.getTime).millis) + .leftMap(_ => InvalidTimeError(cr.key, avroSchedule.getTime)) } yield cr.key -> avroSchedule.toScheduleEvent(delay, cr.topic).some } diff --git a/scheduler/src/main/scala/com/sky/kms/streams/ScheduleReader.scala b/scheduler/src/main/scala/com/sky/kms/streams/ScheduleReader.scala index e5bfc831..a66d62a6 100644 --- a/scheduler/src/main/scala/com/sky/kms/streams/ScheduleReader.scala +++ b/scheduler/src/main/scala/com/sky/kms/streams/ScheduleReader.scala @@ -4,7 +4,6 @@ import akka.Done import akka.actor.{ActorRef, ActorSystem, PoisonPill} import akka.kafka.scaladsl.Consumer.Control import akka.pattern.ask -import akka.stream._ import akka.stream.scaladsl._ import cats.Eval import com.sky.kafka.topicloader._ @@ -19,19 +18,20 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserialize import scala.concurrent.Future -/** - * Provides stream from the schedule source to the scheduling actor. +/** Provides stream from the schedule source to the scheduling actor. */ -case class ScheduleReader[Mat](scheduleSource: Eval[Source[In, (Future[Done], Mat)]], - schedulingActor: ActorRef, - errorHandler: Sink[ApplicationError, Future[Done]], - timeouts: ReaderConfig.TimeoutConfig)(implicit system: ActorSystem) { +case class ScheduleReader[Mat]( + scheduleSource: Eval[Source[In, (Future[Done], Mat)]], + schedulingActor: ActorRef, + errorHandler: Sink[ApplicationError, Future[Done]], + timeouts: ReaderConfig.TimeoutConfig +)(implicit system: ActorSystem) { import system.dispatcher private def initSchedulingActorWhenReady(f: Future[Done]): Future[Any] = - f.flatMap(_ => (schedulingActor ? Initialised)(timeouts.initialisation)).recover { - case t => schedulingActor ! UpstreamFailure(t) + f.flatMap(_ => (schedulingActor ? Initialised)(timeouts.initialisation)).recover { case t => + schedulingActor ! UpstreamFailure(t) } def stream: RunnableGraph[Mat] = @@ -39,7 +39,7 @@ case class ScheduleReader[Mat](scheduleSource: Eval[Source[In, (Future[Done], Ma .map(ScheduleReader.toSchedulingMessage) .alsoTo(extractError.to(errorHandler)) .collect { case Right(msg) => msg } - .to(Sink.actorRefWithAck(schedulingActor, StreamStarted, Ack, PoisonPill, UpstreamFailure)) + .to(Sink.actorRefWithBackpressure(schedulingActor, StreamStarted, Ack, PoisonPill, UpstreamFailure)) } object ScheduleReader extends LazyLogging { @@ -50,9 +50,8 @@ object ScheduleReader extends LazyLogging { type LoadSchedule = SchedulingMessage => Future[Ack.type] def toSchedulingMessage(readResult: In): Either[ApplicationError, SchedulingMessage] = - readResult.map { - case (scheduleId, scheduleOpt) => - scheduleOpt.fold[SchedulingMessage](Cancel(scheduleId))(CreateOrUpdate(scheduleId, _)) + readResult.map { case (scheduleId, scheduleOpt) => + scheduleOpt.fold[SchedulingMessage](Cancel(scheduleId))(CreateOrUpdate(scheduleId, _)) } def configure(actorRef: ActorRef)(implicit system: ActorSystem): Configured[ScheduleReader[Future[Control]]] = @@ -72,7 +71,7 @@ object ScheduleReader extends LazyLogging { ) } - def run(implicit mat: ActorMaterializer): Start[Running[Future[Control]]] = + def run(implicit system: ActorSystem): Start[Running[Future[Control]]] = Start { app => Running(app.reader.stream.run()) } diff --git a/scheduler/src/main/scala/com/sky/kms/streams/ScheduledMessagePublisher.scala b/scheduler/src/main/scala/com/sky/kms/streams/ScheduledMessagePublisher.scala index 297a007f..6157111e 100644 --- a/scheduler/src/main/scala/com/sky/kms/streams/ScheduledMessagePublisher.scala +++ b/scheduler/src/main/scala/com/sky/kms/streams/ScheduledMessagePublisher.scala @@ -4,8 +4,8 @@ import akka.Done import akka.actor.ActorSystem import akka.kafka.ProducerSettings import akka.kafka.scaladsl.Producer +import akka.stream.OverflowStrategy import akka.stream.scaladsl._ -import akka.stream.{ActorMaterializer, OverflowStrategy} import cats.Eval import com.sky.kms.Start import com.sky.kms.actors.PublisherActor.ScheduleQueue @@ -19,10 +19,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import scala.concurrent.Future -/** - * Provides a stream that consumes from the queue of triggered messages, - * writes the scheduled messages to the specified Kafka topics and then deletes the schedules - * from the scheduling Kafka topic to mark completion +/** Provides a stream that consumes from the queue of triggered messages, writes the scheduled messages to the specified + * Kafka topics and then deletes the schedules from the scheduling Kafka topic to mark completion */ case class ScheduledMessagePublisher(queueBufferSize: Int, publisherSink: Eval[Sink[SinkIn, SinkMat]]) extends LazyLogging { @@ -33,11 +31,11 @@ case class ScheduledMessagePublisher(queueBufferSize: Int, publisherSink: Eval[S .mapConcat(splitToMessageAndDeletion) .toMat(publisherSink.value)(Keep.both) - val splitToMessageAndDeletion: In => List[PublishableMessage] = { - case (scheduleId, scheduledMessage) => - logger.info( - s"Publishing scheduled message $scheduleId to ${scheduledMessage.outputTopic} and deleting it from ${scheduledMessage.inputTopic}") - List(scheduledMessage, ScheduleDeletion(scheduleId, scheduledMessage.inputTopic, scheduledMessage.headers)) + val splitToMessageAndDeletion: In => List[PublishableMessage] = { case (scheduleId, scheduledMessage) => + logger.info( + s"Publishing scheduled message $scheduleId to ${scheduledMessage.outputTopic} and deleting it from ${scheduledMessage.inputTopic}" + ) + List(scheduledMessage, ScheduleDeletion(scheduleId, scheduledMessage.inputTopic, scheduledMessage.headers)) } } @@ -57,7 +55,9 @@ object ScheduledMessagePublisher { Flow[PublishableMessage] .map(toProducerRecord) .toMat(Producer.plainSink(ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)))( - Keep.right)) + Keep.right + ) + ) PublisherConfig.configure.map(c => ScheduledMessagePublisher(c.queueBufferSize, writeMsgToKafka)) } @@ -65,11 +65,11 @@ object ScheduledMessagePublisher { val toProducerRecord: PublishableMessage => ProducerRecord[Array[Byte], Array[Byte]] = { case ScheduledMessage(_, outputTopic, key, value, headers) => new ProducerRecord(outputTopic, null, key, value.orNull, headers.asKafkaHeaders) - case ScheduleDeletion(id, outputTopic, headers) => + case ScheduleDeletion(id, outputTopic, headers) => new ProducerRecord(outputTopic, null, id.getBytes, null, headers.asKafkaHeaders) } - def run(implicit mat: ActorMaterializer): Start[Running] = + def run(implicit system: ActorSystem): Start[Running] = Start { app => val (sourceMat, sinkMat) = app.publisher.stream.run() Running(sourceMat, sinkMat) diff --git a/scheduler/src/test/resources/logback-test.xml b/scheduler/src/test/resources/logback-test.xml index ba76427d..21050f06 100644 --- a/scheduler/src/test/resources/logback-test.xml +++ b/scheduler/src/test/resources/logback-test.xml @@ -10,4 +10,4 @@ - \ No newline at end of file + diff --git a/scheduler/src/test/scala/com/sky/kms/base/AkkaKafkaSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/AkkaKafkaSpecBase.scala index b0242726..5d2feb30 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/AkkaKafkaSpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/AkkaKafkaSpecBase.scala @@ -1,7 +1,8 @@ package com.sky.kms.base +import akka.actor.ActorSystem import com.sky.kms.utils.TestActorSystem -trait AkkaKafkaSpecBase extends AkkaStreamSpecBase with KafkaIntSpecBase { - override implicit lazy val system = TestActorSystem(kafkaConfig.kafkaPort) +trait AkkaKafkaSpecBase extends AkkaSpecBase with KafkaIntSpecBase { + override implicit lazy val system: ActorSystem = TestActorSystem(kafkaConfig.kafkaPort) } diff --git a/scheduler/src/test/scala/com/sky/kms/base/AkkaSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/AkkaSpecBase.scala index b87fa64e..fe237323 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/AkkaSpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/AkkaSpecBase.scala @@ -1,11 +1,12 @@ package com.sky.kms.base +import akka.actor.ActorSystem import akka.testkit.{TestKit, TestKitBase} import com.sky.kms.utils.TestActorSystem -abstract class AkkaSpecBase extends TestKitBase with SpecBase { +abstract class AkkaSpecBase extends SpecBase with TestKitBase { - override implicit lazy val system = TestActorSystem() + override implicit lazy val system: ActorSystem = TestActorSystem() override def afterAll(): Unit = { super.afterAll() diff --git a/scheduler/src/test/scala/com/sky/kms/base/AkkaStreamSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/AkkaStreamSpecBase.scala deleted file mode 100644 index 7b4918ce..00000000 --- a/scheduler/src/test/scala/com/sky/kms/base/AkkaStreamSpecBase.scala +++ /dev/null @@ -1,13 +0,0 @@ -package com.sky.kms.base - -import akka.stream.ActorMaterializer - -abstract class AkkaStreamSpecBase extends AkkaSpecBase { - - implicit val materializer = ActorMaterializer() - - override def afterAll(): Unit = { - super.afterAll() - materializer.shutdown() - } -} diff --git a/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala index 1df9dc8a..1752af9a 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala @@ -3,23 +3,27 @@ package com.sky.kms.base import com.sky.kms.kafka.Topic import com.sky.kms.utils.RandomPort.randomPort import eu.timepit.refined.auto._ -import net.manub.embeddedkafka.Codecs.{nullDeserializer, stringDeserializer} -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.Codecs.{nullDeserializer, stringDeserializer} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.serialization.Deserializer -import org.scalatest.WordSpecLike +import org.scalatest.wordspec.AnyWordSpec -import collection.JavaConverters._ -import scala.concurrent.duration._ import scala.compat.java8.DurationConverters._ +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ -trait KafkaIntSpecBase extends EmbeddedKafka with WordSpecLike { +trait KafkaIntSpecBase extends AnyWordSpec with EmbeddedKafka { - implicit lazy val kafkaConfig = EmbeddedKafkaConfig(kafkaPort = randomPort(), zooKeeperPort = randomPort()) + implicit lazy val kafkaConfig: EmbeddedKafkaConfig = + EmbeddedKafkaConfig( + kafkaPort = randomPort(), + zooKeeperPort = randomPort() + ) val scheduleTopic: Topic = "scheduleTopic" val extraScheduleTopic: Topic = "extraScheduleTopic" - def kafkaConsumerTimeout: FiniteDuration = 60 seconds + def kafkaConsumerTimeout: FiniteDuration = 60.seconds private def subscribeAndPoll[K, V](topic: String): KafkaConsumer[K, V] => Iterator[ConsumerRecord[K, V]] = { cr => cr.subscribe(List(topic).asJavaCollection) diff --git a/scheduler/src/test/scala/com/sky/kms/base/SchedulerIntSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/SchedulerIntSpecBase.scala index 0864aa5e..589b324f 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/SchedulerIntSpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/SchedulerIntSpecBase.scala @@ -1,10 +1,13 @@ package com.sky.kms.base import cats.data.NonEmptyList +import com.sky.kms.config.SchedulerConfig import com.sky.kms.utils.TestConfig + import scala.concurrent.duration._ abstract class SchedulerIntSpecBase extends AkkaKafkaSpecBase { - implicit val conf = TestConfig(NonEmptyList.of(scheduleTopic, extraScheduleTopic)) - val tolerance = 1300 milliseconds + implicit val conf: SchedulerConfig = TestConfig(NonEmptyList.of(scheduleTopic, extraScheduleTopic)) + val tolerance: FiniteDuration = 1300.milliseconds + override implicit val patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second) } diff --git a/scheduler/src/test/scala/com/sky/kms/base/SpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/SpecBase.scala index 72b22864..b17b3212 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/SpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/SpecBase.scala @@ -1,7 +1,17 @@ package com.sky.kms.base +import cats.scalatest.EitherValues import com.danielasfregola.randomdatagenerator.RandomDataGenerator -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec -trait SpecBase extends WordSpecLike with Matchers with BeforeAndAfterAll with RandomDataGenerator with ScalaFutures +trait SpecBase + extends AnyWordSpec + with Matchers + with EitherValues + with BeforeAndAfterAll + with RandomDataGenerator + with ScalaFutures + with Eventually diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala index 617f0973..1825ba5a 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala @@ -1,10 +1,10 @@ package com.sky.kms.e2e import com.sky.kms.base.SchedulerIntSpecBase -import com.sky.kms.utils.TestDataUtils._ import com.sky.kms.domain._ +import com.sky.kms.utils.TestDataUtils._ import eu.timepit.refined.auto._ -import net.manub.embeddedkafka.Codecs.{stringSerializer, stringDeserializer, nullSerializer => arrayByteSerializer} +import io.github.embeddedkafka.Codecs.{nullSerializer => arrayByteSerializer, stringDeserializer, stringSerializer} class SchedulerDeleteIntSpec extends SchedulerIntSpecBase { diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala index 001c05af..91caa995 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala @@ -3,10 +3,10 @@ package com.sky.kms.e2e import java.time.OffsetDateTime import com.sky.kms.base.SchedulerIntSpecBase -import com.sky.kms.utils.TestDataUtils._ import com.sky.kms.domain._ +import com.sky.kms.utils.TestDataUtils._ import eu.timepit.refined.auto._ -import net.manub.embeddedkafka.Codecs._ +import io.github.embeddedkafka.Codecs._ class SchedulerIntSpec extends SchedulerIntSpecBase { @@ -28,44 +28,40 @@ class SchedulerIntSpec extends SchedulerIntSpecBase { private class TestContext { def createSchedules(numSchedules: Int, forTopics: List[String]): List[(ScheduleId, ScheduleEvent)] = random[(ScheduleId, ScheduleEvent)](numSchedules).toList - .zip(Stream.continually(forTopics.toStream).flatten.take(numSchedules).toList) - .map { - case ((id, schedule), topic) => - id -> schedule.copy(inputTopic = topic).secondsFromNow(4) + .zip(LazyList.continually(forTopics.to(LazyList)).flatten.take(numSchedules).toList) + .map { case ((id, schedule), topic) => + id -> schedule.copy(inputTopic = topic).secondsFromNow(4) } - def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { - case (id, scheduleEvent) => - val schedule = scheduleEvent.toSchedule - publishToKafka(scheduleEvent.inputTopic, id, schedule.toAvro) - schedule.time + def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { case (id, scheduleEvent) => + val schedule = scheduleEvent.toSchedule + publishToKafka(scheduleEvent.inputTopic, id, schedule.toAvro) + schedule.time } def assertMessagesWrittenFrom(time: OffsetDateTime, schedules: List[(ScheduleId, ScheduleEvent)]): Unit = - schedules.foreach { - case (_, schedule) => - val cr = consumeFirstFrom[Array[Byte]](schedule.outputTopic) + schedules.foreach { case (_, schedule) => + val cr = consumeFirstFrom[Array[Byte]](schedule.outputTopic) - cr.key should contain theSameElementsInOrderAs schedule.key + cr.key should contain theSameElementsInOrderAs schedule.key - schedule.value match { - case Some(value) => cr.value should contain theSameElementsAs value - case None => cr.value shouldBe null - } + schedule.value match { + case Some(value) => cr.value should contain theSameElementsAs value + case None => cr.value shouldBe null + } - cr.timestamp shouldBe time.toInstant.toEpochMilli +- tolerance.toMillis - cr.headers().toArray.map(h => h.key() -> h.value().toList) should contain theSameElementsAs - schedule.headers.map { - case (k, v) => (k, v.toList) - } + cr.timestamp shouldBe time.toInstant.toEpochMilli +- tolerance.toMillis + cr.headers().toArray.map(h => h.key() -> h.value().toList) should contain theSameElementsAs + schedule.headers.map { case (k, v) => + (k, v.toList) + } } def assertTombstoned(schedules: List[(ScheduleId, ScheduleEvent)]): Unit = - schedules.groupBy(_._2.inputTopic).foreach { - case (topic, schedulesByInputTopic) => - val tombstones = consumeSomeFrom[String](topic, schedulesByInputTopic.size * 2).filter(_.value == null) - tombstones.size shouldBe schedulesByInputTopic.size - tombstones.map(_.key) shouldBe schedulesByInputTopic.map(_._1).distinct + schedules.groupBy(_._2.inputTopic).foreach { case (topic, schedulesByInputTopic) => + val tombstones = consumeSomeFrom[String](topic, schedulesByInputTopic.size * 2).filter(_.value == null) + tombstones.size shouldBe schedulesByInputTopic.size + tombstones.map(_.key) shouldBe schedulesByInputTopic.map(_._1).distinct } } diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala index 402487a0..516cbb81 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala @@ -10,11 +10,12 @@ import akka.testkit.TestProbe import cats.data.NonEmptyList import cats.syntax.either._ import cats.syntax.option._ +import com.sky.kms.actors.SchedulingActor.UpstreamFailure import com.sky.kms.base.SpecBase import com.sky.kms.config._ import com.sky.kms.domain.{ApplicationError, ScheduleEvent} import com.sky.kms.kafka.Topic -import com.sky.kms.streams.{ScheduleReader, ScheduledMessagePublisher} +import com.sky.kms.streams.ScheduleReader import com.sky.kms.utils.TestDataUtils._ import com.sky.kms.utils.{StubControl, TestConfig} import com.sky.kms.{AkkaComponents, SchedulerApp} @@ -25,7 +26,7 @@ import scala.concurrent.{Await, Future} class SchedulerResiliencySpec extends SpecBase { - override implicit val patienceConfig = PatienceConfig(10 seconds, 500 millis) + override implicit val patienceConfig = PatienceConfig(10.seconds, 500.millis) "KMS" should { "terminate when the reader stream fails" in new TestContext with FailingSource with AkkaComponents { @@ -51,10 +52,10 @@ class SchedulerResiliencySpec extends SpecBase { } "terminate when the queue buffer becomes full" in new TestContext with IteratingSource with AkkaComponents { - val sameTimeSchedules = random[ScheduleEvent](n = 20).map(_.secondsFromNow(2)) - val probe = TestProbe() + val sameTimeSchedules = random[ScheduleEvent](n = 20).map(_.secondsFromNow(2)) + val probe = TestProbe() val sinkThatWillNotSignalDemand = Sink - .actorRefWithAck[ScheduledMessagePublisher.SinkIn](probe.ref, "", "", "") + .actorRefWithBackpressure(probe.ref, "", "", "", UpstreamFailure) .mapMaterializedValue(_ => Future.never) val app = @@ -76,7 +77,7 @@ class SchedulerResiliencySpec extends SpecBase { SchedulerApp.configure apply AppConfig(config) def hasActorSystemTerminated(implicit system: ActorSystem): Boolean = - Await.ready(system.whenTerminated, 10 seconds).isCompleted + Await.ready(system.whenTerminated, 10.seconds).isCompleted } private trait FailingSource { @@ -85,7 +86,8 @@ class SchedulerResiliencySpec extends SpecBase { val sourceThatWillFail: Source[ScheduleReader.In, (Future[Done], Future[Control])] = Source .fromIterator(() => - Iterator(("someId" -> none[ScheduleEvent]).asRight[ApplicationError]) ++ (throw new Exception("boom!"))) + Iterator(("someId" -> none[ScheduleEvent]).asRight[ApplicationError]) ++ (throw new Exception("boom!")) + ) .mapMaterializedValue(_ => Future.successful(Done) -> Future.successful(StubControl())) } @@ -95,7 +97,7 @@ class SchedulerResiliencySpec extends SpecBase { def sourceWith(schedules: Seq[ScheduleEvent]): Source[ScheduleReader.In, (Future[Done], Future[Control])] = { val scheduleIds = List.fill(schedules.size)(UUID.randomUUID().toString) - val elements = (scheduleIds, schedules.map(_.some)).zipped.toIterator.map(_.asRight[ApplicationError]).toList + val elements = scheduleIds.zip(schedules.map(_.some)).map(_.asRight[ApplicationError]) Source(elements).mapMaterializedValue(_ => Future.successful(Done) -> Future.successful(StubControl())) } diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala index d59cecdd..05f04b51 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala @@ -1,16 +1,16 @@ package com.sky.kms.e2e -import com.danielasfregola.randomdatagenerator.RandomDataGenerator import cats.syntax.option._ +import com.danielasfregola.randomdatagenerator.RandomDataGenerator import com.sky.kms._ import com.sky.kms.base.SchedulerIntSpecBase import com.sky.kms.domain.ScheduleEvent import com.sky.kms.streams.ScheduleReader import com.sky.kms.utils.TestDataUtils._ -import net.manub.embeddedkafka.Codecs.{ - stringSerializer, +import io.github.embeddedkafka.Codecs.{ nullDeserializer => arrayByteDeserializer, - nullSerializer => arrayByteSerializer + nullSerializer => arrayByteSerializer, + stringSerializer } class SchedulerSchemaEvolutionSpec extends SchedulerIntSpecBase with RandomDataGenerator { @@ -47,7 +47,7 @@ class SchedulerSchemaEvolutionSpec extends SchedulerIntSpecBase with RandomDataG trait TestContext { val inputTopic = "cupcat" - val delay = 4 + val delay = 4L def publishAndGetDecoded(inputTopic: String, schedule: Array[Byte]) = { publishToKafka(inputTopic, inputTopic, schedule) @@ -62,10 +62,13 @@ class SchedulerSchemaEvolutionSpec extends SchedulerIntSpecBase with RandomDataG } implicit class HeaderOps(val schedule: ScheduleReader.In) { - def headers = - schedule.fold(_ => none[Map[String, Array[Byte]]], { - case (_, ose) => ose.fold(none[Map[String, Array[Byte]]])(_.headers.some) - }) + def headers = + schedule.fold( + _ => none[Map[String, Array[Byte]]], + { case (_, ose) => + ose.fold(none[Map[String, Array[Byte]]])(_.headers.some) + } + ) def headerKeys = headers.map(_.keys.toList) def headerValues = headers.map(_.values.toList.map(_.toList)) } diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/package.scala b/scheduler/src/test/scala/com/sky/kms/e2e/package.scala index d2bb6020..39127528 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/package.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/package.scala @@ -1,25 +1,25 @@ package com.sky.kms -import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.actor.CoordinatedShutdown.UnknownReason -import akka.stream.ActorMaterializer +import akka.actor.{ActorSystem, CoordinatedShutdown} import com.sky.kms.config.{AppConfig, SchedulerConfig} package object e2e { def withSchedulerApp[T]( - scenario: => T)(implicit conf: SchedulerConfig, system: ActorSystem, mat: ActorMaterializer): T = + scenario: => T + )(implicit conf: SchedulerConfig, system: ActorSystem): T = withRunningScheduler(SchedulerApp.configure apply AppConfig(conf))(_ => scenario) - def withRunningScheduler[T](schedulerApp: SchedulerApp)( - scenario: SchedulerApp.Running => T)(implicit system: ActorSystem, mat: ActorMaterializer): T = { + def withRunningScheduler[T]( + schedulerApp: SchedulerApp + )(scenario: SchedulerApp.Running => T)(implicit system: ActorSystem): T = { val runningApp = SchedulerApp.run apply schedulerApp - try { + try scenario(runningApp) - } finally { + finally CoordinatedShutdown(system).run(UnknownReason) - } } } diff --git a/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala index d2e32f73..3ac0ef06 100644 --- a/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala @@ -2,8 +2,8 @@ package com.sky.kms.integration import java.util.UUID +import akka.actor.ActorSystem import akka.testkit.{TestActor, TestProbe} -import cats.instances.tuple._ import cats.syntax.functor._ import com.sky.kms.actors.SchedulingActor._ import com.sky.kms.base.SchedulerIntSpecBase @@ -13,19 +13,32 @@ import com.sky.kms.streams.ScheduleReader import com.sky.kms.utils.TestActorSystem import com.sky.kms.utils.TestDataUtils._ import eu.timepit.refined.auto._ -import net.manub.embeddedkafka.Codecs.{stringSerializer, nullSerializer => arrayByteSerializer} -import org.scalatest.concurrent.Eventually +import io.github.embeddedkafka.Codecs.{nullSerializer => arrayByteSerializer, stringSerializer} import scala.concurrent.Await import scala.concurrent.duration._ -class ScheduleReaderIntSpec extends SchedulerIntSpecBase with Eventually { +class ScheduleReaderIntSpec extends SchedulerIntSpecBase { - override implicit lazy val system = TestActorSystem(kafkaConfig.kafkaPort, akkaExpectDuration = 20.seconds) + override implicit lazy val system: ActorSystem = + TestActorSystem(kafkaConfig.kafkaPort, akkaExpectDuration = 20.seconds) val numSchedules = 3 "stream" should { + "continue processing when Kafka becomes available" in withRunningScheduleReader { probe => + withRunningKafka { + probe.expectMsg(StreamStarted) + probe.expectMsg(Initialised) + scheduleShouldFlow(probe) + } + // Wait 5 seconds. Embedded Kafka causes issues if you restart too quickly on the same ports. + Thread.sleep(5000) + withRunningKafka { + scheduleShouldFlow(probe) + } + } + "not schedule messages that have been deleted but not compacted on startup" in withRunningKafka { val schedules @ firstSchedule :: _ = List.fill(numSchedules)(generateSchedule) writeSchedulesToKafka(schedules: _*) @@ -40,17 +53,6 @@ class ScheduleReaderIntSpec extends SchedulerIntSpecBase with Eventually { probe.expectMsg(Initialised) } } - - "continue processing when Kafka becomes available" in withRunningScheduleReader { probe => - withRunningKafka { - probe.expectMsg(StreamStarted) - probe.expectMsg(Initialised) - scheduleShouldFlow(probe) - } - withRunningKafka { - scheduleShouldFlow(probe) - } - } } private def generateSchedule: (ScheduleId, ScheduleEvent) = UUID.randomUUID().toString -> random[ScheduleEvent] @@ -63,7 +65,8 @@ class ScheduleReaderIntSpec extends SchedulerIntSpecBase with Eventually { case _ => sender ! Ack TestActor.KeepRunning - }) + } + ) p } @@ -73,17 +76,19 @@ class ScheduleReaderIntSpec extends SchedulerIntSpecBase with Eventually { .stream .run() - try { + try scenario(probe) - } finally { - Await.ready(controlF.flatMap(_.shutdown())(system.dispatcher), 5 seconds) - } + finally + Await.ready(controlF.flatMap(_.shutdown())(system.dispatcher), 5.seconds) } private def writeSchedulesToKafka(schedules: (ScheduleId, ScheduleEvent)*): Unit = - publishToKafka(scheduleTopic, schedules.map { - case (scheduleId, scheduleEvent) => (scheduleId, scheduleEvent.toSchedule.toAvro) - }) + publishToKafka( + scheduleTopic, + schedules.map { case (scheduleId, scheduleEvent) => + (scheduleId, scheduleEvent.toSchedule.toAvro) + } + ) private def scheduleShouldFlow(probe: TestProbe): SchedulingMessage = { writeSchedulesToKafka(generateSchedule) diff --git a/scheduler/src/test/scala/com/sky/kms/unit/AvroSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/AvroSpec.scala index fa09919b..1f9dc806 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/AvroSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/AvroSpec.scala @@ -3,8 +3,8 @@ package com.sky.kms.unit import java.time.OffsetDateTime import com.sksamuel.avro4s._ -import com.sky.kms.base.SpecBase import com.sky.kms.avro._ +import com.sky.kms.base.SpecBase import org.apache.avro.Schema import org.apache.avro.generic.GenericData diff --git a/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala index aaf04a06..f8ffeb75 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala @@ -6,8 +6,8 @@ import akka.testkit.TestActorRef import com.sky.kms.actors.PublisherActor import com.sky.kms.actors.PublisherActor._ import com.sky.kms.base.AkkaSpecBase -import com.sky.kms.utils.TestDataUtils._ import com.sky.kms.domain.{ScheduleEvent, ScheduleId} +import com.sky.kms.utils.TestDataUtils._ import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar diff --git a/scheduler/src/test/scala/com/sky/kms/unit/ScheduleReaderSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/ScheduleReaderSpec.scala index 9626f591..46c8130e 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/ScheduleReaderSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/ScheduleReaderSpec.scala @@ -8,18 +8,17 @@ import cats.Eval import cats.syntax.either._ import com.sky.kms.actors.SchedulingActor import com.sky.kms.actors.SchedulingActor._ -import com.sky.kms.base.AkkaStreamSpecBase +import com.sky.kms.base.{AkkaSpecBase, SpecBase} import com.sky.kms.config.ReaderConfig import com.sky.kms.domain._ import com.sky.kms.streams.ScheduleReader import com.sky.kms.streams.ScheduleReader.In import com.sky.kms.utils.TestDataUtils._ -import org.scalatest.concurrent.Eventually import scala.concurrent.duration._ import scala.concurrent.{Future, Promise} -class ScheduleReaderSpec extends AkkaStreamSpecBase with Eventually { +class ScheduleReaderSpec extends AkkaSpecBase with SpecBase { override implicit val patienceConfig = PatienceConfig(500.millis, 20.millis) @@ -97,25 +96,30 @@ class ScheduleReaderSpec extends AkkaStreamSpecBase with Eventually { case _ => sender ! Ack TestActor.KeepRunning - }) + } + ) p } def delayedSource = Source.tick(100.millis, 100.millis, msg).mapMaterializedValue(_ => NotUsed) - def runReader(in: Source[In, NotUsed], - errorHandler: Sink[ApplicationError, Future[Done]] = Sink.ignore, - sourceMatFuture: Future[Done] = Future.never): NotUsed = - ScheduleReader(Eval.now(in.mapMaterializedValue(nu => sourceMatFuture -> nu)), - probe.ref, - errorHandler, - ReaderConfig.TimeoutConfig(100.millis, 100.millis)).stream.run() + def runReader( + in: Source[In, NotUsed], + errorHandler: Sink[ApplicationError, Future[Done]] = Sink.ignore, + sourceMatFuture: Future[Done] = Future.never + ): NotUsed = + ScheduleReader( + Eval.now(in.mapMaterializedValue(nu => sourceMatFuture -> nu)), + probe.ref, + errorHandler, + ReaderConfig.TimeoutConfig(100.millis, 100.millis) + ).stream.run() } private trait ErrorHandler { this: TestContext => - val awaitingError = Promise[ApplicationError] + val awaitingError = Promise[ApplicationError]() val errorHandler = Sink.foreach[ApplicationError](awaitingError.trySuccess) } diff --git a/scheduler/src/test/scala/com/sky/kms/unit/ScheduledMessagePublisherSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/ScheduledMessagePublisherSpec.scala index 41d3516f..7268c380 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/ScheduledMessagePublisherSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/ScheduledMessagePublisherSpec.scala @@ -26,7 +26,7 @@ class ScheduledMessagePublisherSpec extends SpecBase { "convert schedule deletion to a delete record" in { val List(scheduleId, topic) = random[String](2).toList - val record = ScheduledMessagePublisher.toProducerRecord( + val record = ScheduledMessagePublisher.toProducerRecord( ScheduleDeletion(scheduleId, topic, Map.empty) ) @@ -46,7 +46,7 @@ class ScheduledMessagePublisherSpec extends SpecBase { "convert scheduled message with headers to producer record with headers" in { val schedule = random[ScheduleEvent] - val headers = ScheduledMessagePublisher + val headers = ScheduledMessagePublisher .toProducerRecord(schedule.toScheduledMessage) .headers() .toArray diff --git a/scheduler/src/test/scala/com/sky/kms/unit/SchedulerSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/SchedulerSpec.scala index c384219d..44cf0442 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/SchedulerSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/SchedulerSpec.scala @@ -44,7 +44,7 @@ class SchedulerSpec extends SpecBase { } "error if message does not adhere to our schema" in { - val cr = new ConsumerRecord[String, Array[Byte]]("scheduleTopic", 1, 1l, ScheduleId, Array.emptyByteArray) + val cr = new ConsumerRecord[String, Array[Byte]]("scheduleTopic", 1, 1L, ScheduleId, Array.emptyByteArray) scheduleConsumerRecordDecoder(cr) shouldBe Left(InvalidSchemaError(ScheduleId)) } @@ -54,13 +54,13 @@ class SchedulerSpec extends SpecBase { val schedule = TestSchedule.toSchedule.copy(time = tooDistantFuture) val cr = artificialConsumerRecord(ScheduleId, schedule.toAvro) - scheduleConsumerRecordDecoder(cr).left.get shouldBe a[InvalidTimeError] + scheduleConsumerRecordDecoder(cr).leftValue shouldBe a[InvalidTimeError] } } private def artificialConsumerRecord(scheduleId: ScheduleId, avroBytes: Array[Byte]) = - new ConsumerRecord[String, Array[Byte]]("scheduleTopic", 1, 1l, scheduleId, avroBytes) + new ConsumerRecord[String, Array[Byte]]("scheduleTopic", 1, 1L, scheduleId, avroBytes) private def equalHeaders(x: Map[String, Array[Byte]], y: Map[String, Array[Byte]]): Boolean = - x.mapValues(_.toList) === y.mapValues(_.toList) + x.view.mapValues(_.toList).toMap === y.view.mapValues(_.toList).toMap } diff --git a/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala index 694f408c..a504b315 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala @@ -8,9 +8,9 @@ import com.sky.kms.actors.PublisherActor.Trigger import com.sky.kms.actors.SchedulingActor import com.sky.kms.actors.SchedulingActor._ import com.sky.kms.base.AkkaSpecBase -import com.sky.kms.utils.TestDataUtils._ import com.sky.kms.domain._ import com.sky.kms.utils.SimpleCounterMonitoring +import com.sky.kms.utils.TestDataUtils._ import monix.execution.schedulers.TestScheduler import org.scalatest.concurrent.Eventually import org.scalatestplus.mockito.MockitoSugar @@ -19,7 +19,7 @@ import scala.concurrent.duration._ class SchedulingActorSpec extends AkkaSpecBase with ImplicitSender with MockitoSugar with Eventually { - val NoMsgTimeout = 2 seconds + val NoMsgTimeout = 2.seconds "A scheduling actor" must { diff --git a/scheduler/src/test/scala/com/sky/kms/utils/RandomPort.scala b/scheduler/src/test/scala/com/sky/kms/utils/RandomPort.scala index 44933385..84cb4e65 100644 --- a/scheduler/src/test/scala/com/sky/kms/utils/RandomPort.scala +++ b/scheduler/src/test/scala/com/sky/kms/utils/RandomPort.scala @@ -6,7 +6,7 @@ object RandomPort { def randomPort(): Int = { val socket = new ServerSocket(0) socket.setReuseAddress(true) - val port = socket.getLocalPort + val port = socket.getLocalPort socket.close() port } diff --git a/scheduler/src/test/scala/com/sky/kms/utils/TestActorSystem.scala b/scheduler/src/test/scala/com/sky/kms/utils/TestActorSystem.scala index 83f41e14..91db1406 100644 --- a/scheduler/src/test/scala/com/sky/kms/utils/TestActorSystem.scala +++ b/scheduler/src/test/scala/com/sky/kms/utils/TestActorSystem.scala @@ -16,6 +16,7 @@ object TestActorSystem { | test.single-expect-default = $akkaExpectDuration | coordinated-shutdown { | terminate-actor-system = $terminateActorSystem + | run-by-actor-system-terminate = off | run-by-jvm-shutdown-hook = off | } | @@ -33,9 +34,11 @@ object TestActorSystem { |} """.stripMargin - def apply(kafkaPort: Int = 9092, - terminateActorSystem: Boolean = false, - akkaExpectDuration: Duration = 3.seconds): ActorSystem = + def apply( + kafkaPort: Int = 9092, + terminateActorSystem: Boolean = false, + akkaExpectDuration: Duration = 3.seconds + ): ActorSystem = ActorSystem( name = s"test-actor-system-${UUID.randomUUID().toString}", config = ConfigFactory diff --git a/scheduler/src/test/scala/com/sky/kms/utils/TestConfig.scala b/scheduler/src/test/scala/com/sky/kms/utils/TestConfig.scala index cf7a423d..c8a96a2e 100644 --- a/scheduler/src/test/scala/com/sky/kms/utils/TestConfig.scala +++ b/scheduler/src/test/scala/com/sky/kms/utils/TestConfig.scala @@ -10,6 +10,6 @@ object TestConfig { def apply(topics: NonEmptyList[Topic]): SchedulerConfig = SchedulerConfig( ReaderConfig(topics, timeouts = ReaderConfig.TimeoutConfig(100.millis, 100.millis)), - PublisherConfig(queueBufferSize = 100), + PublisherConfig(queueBufferSize = 100) ) } diff --git a/scheduler/src/test/scala/com/sky/kms/utils/TestDataUtils.scala b/scheduler/src/test/scala/com/sky/kms/utils/TestDataUtils.scala index bc75123c..79631518 100644 --- a/scheduler/src/test/scala/com/sky/kms/utils/TestDataUtils.scala +++ b/scheduler/src/test/scala/com/sky/kms/utils/TestDataUtils.scala @@ -10,7 +10,7 @@ import akka.stream.scaladsl.{Sink, Source} import cats.Eval import com.fortysevendeg.scalacheck.datetime.GenDateTime.genDateTimeWithinRange import com.fortysevendeg.scalacheck.datetime.instances.jdk8._ -import com.sksamuel.avro4s.{AvroOutputStream, AvroSchema, Encoder, SchemaFor} +import com.sksamuel.avro4s.{AvroOutputStream, Encoder, SchemaFor} import com.sky.kms.SchedulerApp import com.sky.kms.avro._ import com.sky.kms.domain.PublishableMessage.ScheduledMessage @@ -18,7 +18,6 @@ import com.sky.kms.domain.Schedule.{ScheduleNoHeaders, ScheduleWithHeaders} import com.sky.kms.domain.{Schedule, ScheduleEvent} import com.sky.kms.streams.{ScheduleReader, ScheduledMessagePublisher} import org.scalacheck.{Arbitrary, Gen} -import org.zalando.grafter.syntax.rewriter._ import scala.concurrent.Future import scala.concurrent.duration._ @@ -42,7 +41,7 @@ object TestDataUtils { implicit val encoderForScheduleWithHeaders = Encoder[ScheduleWithHeaders] implicit val encoderForScheduleNoHeaders = Encoder[ScheduleNoHeaders] - implicit class ScheduleEventOps(val schedule: ScheduleEvent) extends AnyVal { + implicit class ScheduleEventOps(private val schedule: ScheduleEvent) extends AnyVal { def toSchedule: ScheduleWithHeaders = { val time = OffsetDateTime.now().toInstant.plusMillis(schedule.delay.toMillis).atOffset(ZoneOffset.UTC) ScheduleWithHeaders(time, schedule.outputTopic, schedule.key, schedule.value, schedule.headers) @@ -58,7 +57,7 @@ object TestDataUtils { def headerValues = schedule.headers.values } - implicit class ScheduleEventNoHeadersOps(val schedule: ScheduleEventNoHeaders) extends AnyVal { + implicit class ScheduleEventNoHeadersOps(private val schedule: ScheduleEventNoHeaders) extends AnyVal { def toScheduleWithoutHeaders: ScheduleNoHeaders = { val time = OffsetDateTime.now().toInstant.plusMillis(schedule.delay.toMillis).atOffset(ZoneOffset.UTC) ScheduleNoHeaders(time, schedule.outputTopic, schedule.key, schedule.value) @@ -71,35 +70,37 @@ object TestDataUtils { ScheduledMessage(schedule.inputTopic, schedule.outputTopic, schedule.key, schedule.value, Map.empty) } - implicit class ScheduleOps[T <: Schedule](val schedule: T) extends AnyVal { - def toAvro(implicit sf: SchemaFor[T], e: Encoder[T]): Array[Byte] = toAvroFrom(schedule) - def timeInMillis: Long = schedule.getTime.toInstant.toEpochMilli + implicit class ScheduleOps[T <: Schedule](private val schedule: T) extends AnyVal { + def toAvro(implicit e: Encoder[T]): Array[Byte] = toAvroFrom(schedule) + def timeInMillis: Long = schedule.getTime.toInstant.toEpochMilli } - private def toAvroFrom[T <: Schedule : Encoder : SchemaFor](t: T) = { + private def toAvroFrom[T <: Schedule : Encoder](t: T) = { val baos = new ByteArrayOutputStream() - val output = AvroOutputStream.binary[T].to(baos).build(AvroSchema[T]) + val output = AvroOutputStream.binary[T].to(baos).build() output.write(t) output.close() baos.toByteArray } - implicit class SchedulerAppOps(val schedulerApp: SchedulerApp) extends AnyVal { - def withReaderSource(src: Source[ScheduleReader.In, (Future[Done], Future[Control])])( - implicit as: ActorSystem): SchedulerApp = + implicit class SchedulerAppOps(private val schedulerApp: SchedulerApp) extends AnyVal { + def withReaderSource(src: Source[ScheduleReader.In, (Future[Done], Future[Control])])(implicit + as: ActorSystem + ): SchedulerApp = schedulerApp.copy(reader = schedulerApp.reader.copy(scheduleSource = Eval.later(src))) def withPublisherSink( - sink: Sink[ScheduledMessagePublisher.SinkIn, ScheduledMessagePublisher.SinkMat]): SchedulerApp = - schedulerApp.modifyWith[Any] { - case pub: ScheduledMessagePublisher => pub.replace(Eval.later(sink)) - } + newSink: Sink[ScheduledMessagePublisher.SinkIn, ScheduledMessagePublisher.SinkMat] + ): SchedulerApp = + schedulerApp.copy(publisher = schedulerApp.publisher.copy(publisherSink = Eval.later(newSink))) } - case class ScheduleEventNoHeaders(delay: FiniteDuration, - inputTopic: String, - outputTopic: String, - key: Array[Byte], - value: Option[Array[Byte]]) + case class ScheduleEventNoHeaders( + delay: FiniteDuration, + inputTopic: String, + outputTopic: String, + key: Array[Byte], + value: Option[Array[Byte]] + ) }