diff --git a/build.sbt b/build.sbt index 25984eb2c..03a746eee 100644 --- a/build.sbt +++ b/build.sbt @@ -90,6 +90,7 @@ lazy val buildSettings = Seq( name := "snowplow-stream-collector", description := "Scala Stream Collector for Snowplow raw events", scalaVersion := "2.12.10", + scalacOptions ++= Seq("-Ypartial-unification"), javacOptions := Seq("-source", "11", "-target", "11"), resolvers ++= Dependencies.resolutionRepos ) @@ -109,7 +110,7 @@ lazy val allSettings = buildSettings ++ lazy val root = project .in(file(".")) .settings(buildSettings ++ dynVerSettings) - .aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq) + .aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq, http4s) lazy val core = project .settings(moduleName := "snowplow-stream-collector-core") @@ -119,6 +120,17 @@ lazy val core = project .settings(Defaults.itSettings) .configs(IntegrationTest) +lazy val http4s = project + .settings(moduleName := "snowplow-stream-collector-http4s-core") + .settings(buildSettings ++ BuildSettings.sbtAssemblySettings) + .settings( + libraryDependencies ++= Seq( + Dependencies.Libraries.http4sDsl, + Dependencies.Libraries.http4sServer, + Dependencies.Libraries.specs2 + ) + ) + lazy val kinesisSettings = allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( moduleName := "snowplow-stream-collector-kinesis", @@ -251,14 +263,14 @@ lazy val stdoutSettings = lazy val stdout = project .settings(stdoutSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val stdoutDistroless = project .in(file("distroless/stdout")) .settings(sourceDirectory := (stdout / sourceDirectory).value) .settings(stdoutSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val rabbitmqSettings = allSettings ++ buildInfoSettings ++ Seq( diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala new file mode 100644 index 000000000..13b43edef --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -0,0 +1,19 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.{ExitCode, IO} +import com.comcast.ip4s.IpLiteralSyntax +import org.http4s.ember.server.EmberServerBuilder + +object CollectorApp { + + def run(): IO[ExitCode] = + buildHttpServer().use(_ => IO.never).as(ExitCode.Success) + + private def buildHttpServer() = + EmberServerBuilder + .default[IO] + .withHost(ipv4"0.0.0.0") + .withPort(port"8080") + .withHttpApp(new CollectorRoutes[IO].value) + .build +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala new file mode 100644 index 000000000..407e3af70 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala @@ -0,0 +1,15 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.Sync +import org.http4s.{HttpApp, HttpRoutes} +import org.http4s.dsl.Http4sDsl + +class CollectorRoutes[F[_]: Sync]() extends Http4sDsl[F] { + + lazy val value: HttpApp[F] = HttpRoutes + .of[F] { + case GET -> Root / "health" => + Ok("OK") + } + .orNotFound +} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala new file mode 100644 index 000000000..d3c2c7b57 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala @@ -0,0 +1,21 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import org.http4s.implicits.http4sLiteralsSyntax +import org.http4s.{Method, Request, Status} +import org.specs2.mutable.Specification + +class CollectorRoutesSpec extends Specification { + + "Health endpoint" should { + "return OK always because collector always works" in { + val request = Request[IO](method = Method.GET, uri = uri"/health") + val response = new CollectorRoutes[IO].value.run(request).unsafeRunSync() + + response.status must beEqualTo(Status.Ok) + response.as[String].unsafeRunSync() must beEqualTo("OK") + } + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6cb214c79..b523083cb 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -87,6 +87,11 @@ object Dependencies { val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics + + //http4s + val http4sDsl = "org.http4s" %% "http4s-dsl" % "0.23.23" + val http4sServer = "org.http4s" %% "http4s-ember-server" % "0.23.23" + // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index 4fbdb1f2c..caeec05f5 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -14,27 +14,10 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.StdoutSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService +import cats.effect.{ExitCode, IO, IOApp} -object StdoutCollector extends Collector { +object StdoutCollector extends IOApp { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion - - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks = { - val (good, bad) = collectorConf.streams.sink match { - case s: Stdout => (new StdoutSink(s.maxBytes, "out"), new StdoutSink(s.maxBytes, "err")) - case _ => throw new IllegalArgumentException("Configured sink is not stdout") - } - CollectorSinks(good, bad) - } - run(collectorConf, akkaConf, sinks, telemetry) - } + def run(args: List[String]): IO[ExitCode] = + CollectorApp.run() } diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala index 72a2f6c42..181645a2a 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala @@ -1,40 +1,40 @@ -/* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. - * All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache - * License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. - * - * See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream -package sinks - -import org.apache.commons.codec.binary.Base64 - -class StdoutSink(val maxBytes: Int, streamName: String) extends Sink { - - // Print a Base64-encoded event. - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - streamName match { - case "out" => - events.foreach { e => - println(Base64.encodeBase64String(e)) - } - case "err" => - events.foreach { e => - Console.err.println(Base64.encodeBase64String(e)) - } - } - - override def shutdown(): Unit = () -} +///* +// * Copyright (c) 2013-2022 Snowplow Analytics Ltd. +// * All rights reserved. +// * +// * This program is licensed to you under the Apache License Version 2.0, +// * and you may not use this file except in compliance with the Apache +// * License Version 2.0. +// * You may obtain a copy of the Apache License Version 2.0 at +// * http://www.apache.org/licenses/LICENSE-2.0. +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the Apache License Version 2.0 is distributed +// * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// * either express or implied. +// * +// * See the Apache License Version 2.0 for the specific language +// * governing permissions and limitations there under. +// */ +//package com.snowplowanalytics.snowplow.collectors.scalastream +//package sinks +// +//import org.apache.commons.codec.binary.Base64 +// +//class StdoutSink(val maxBytes: Int, streamName: String) extends Sink { +// +// // Print a Base64-encoded event. +// override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = +// streamName match { +// case "out" => +// events.foreach { e => +// println(Base64.encodeBase64String(e)) +// } +// case "err" => +// events.foreach { e => +// Console.err.println(Base64.encodeBase64String(e)) +// } +// } +// +// override def shutdown(): Unit = () +//} diff --git a/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutConfigSpec.scala b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutConfigSpec.scala index c25885e85..dee23d566 100644 --- a/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutConfigSpec.scala +++ b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutConfigSpec.scala @@ -1,25 +1,25 @@ -/** - * Copyright (c) 2014-2022 Snowplow Analytics Ltd. - * All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache - * License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. - * - * See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec - -class StdoutConfigSpec extends ConfigSpec { - makeConfigTest("stdout", "", "") -} +///** +// * Copyright (c) 2014-2022 Snowplow Analytics Ltd. +// * All rights reserved. +// * +// * This program is licensed to you under the Apache License Version 2.0, +// * and you may not use this file except in compliance with the Apache +// * License Version 2.0. +// * You may obtain a copy of the Apache License Version 2.0 at +// * http://www.apache.org/licenses/LICENSE-2.0. +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the Apache License Version 2.0 is distributed +// * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// * either express or implied. +// * +// * See the Apache License Version 2.0 for the specific language +// * governing permissions and limitations there under. +// */ +//package com.snowplowanalytics.snowplow.collectors.scalastream +// +//import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +// +//class StdoutConfigSpec extends ConfigSpec { +// makeConfigTest("stdout", "", "") +//}