Skip to content

Commit

Permalink
Init
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 28, 2023
1 parent d9ee385 commit 9f45a44
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 89 deletions.
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ lazy val buildSettings = Seq(
name := "snowplow-stream-collector",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-Ypartial-unification"),
javacOptions := Seq("-source", "11", "-target", "11"),
resolvers ++= Dependencies.resolutionRepos
)
Expand All @@ -109,7 +110,7 @@ lazy val allSettings = buildSettings ++
lazy val root = project
.in(file("."))
.settings(buildSettings ++ dynVerSettings)
.aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq)
.aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq, http4s)

lazy val core = project
.settings(moduleName := "snowplow-stream-collector-core")
Expand All @@ -119,6 +120,17 @@ lazy val core = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)

lazy val http4s = project
.settings(moduleName := "snowplow-stream-collector-http4s-core")
.settings(buildSettings ++ BuildSettings.sbtAssemblySettings)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.http4sDsl,
Dependencies.Libraries.http4sServer,
Dependencies.Libraries.specs2
)
)

lazy val kinesisSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
moduleName := "snowplow-stream-collector-kinesis",
Expand Down Expand Up @@ -251,14 +263,14 @@ lazy val stdoutSettings =
lazy val stdout = project
.settings(stdoutSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val stdoutDistroless = project
.in(file("distroless/stdout"))
.settings(sourceDirectory := (stdout / sourceDirectory).value)
.settings(stdoutSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val rabbitmqSettings =
allSettings ++ buildInfoSettings ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.IpLiteralSyntax
import org.http4s.ember.server.EmberServerBuilder

object CollectorApp {

def run(): IO[ExitCode] =
buildHttpServer().use(_ => IO.never).as(ExitCode.Success)

private def buildHttpServer() =
EmberServerBuilder
.default[IO]
.withHost(ipv4"0.0.0.0")
.withPort(port"8080")
.withHttpApp(new CollectorRoutes[IO].value)
.build
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.Sync
import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.dsl.Http4sDsl

class CollectorRoutes[F[_]: Sync]() extends Http4sDsl[F] {

lazy val value: HttpApp[F] = HttpRoutes
.of[F] {
case GET -> Root / "health" =>
Ok("OK")
}
.orNotFound
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.http4s.implicits.http4sLiteralsSyntax
import org.http4s.{Method, Request, Status}
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {

"Health endpoint" should {
"return OK always because collector always works" in {
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = new CollectorRoutes[IO].value.run(request).unsafeRunSync()

response.status must beEqualTo(Status.Ok)
response.as[String].unsafeRunSync() must beEqualTo("OK")
}
}

}
5 changes: 5 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ object Dependencies {
val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig
val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics


//http4s
val http4sDsl = "org.http4s" %% "http4s-dsl" % "0.23.23"
val http4sServer = "org.http4s" %% "http4s-ember-server" % "0.23.23"

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,10 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.StdoutSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import cats.effect.{ExitCode, IO, IOApp}

object StdoutCollector extends Collector {
object StdoutCollector extends IOApp {

def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks = {
val (good, bad) = collectorConf.streams.sink match {
case s: Stdout => (new StdoutSink(s.maxBytes, "out"), new StdoutSink(s.maxBytes, "err"))
case _ => throw new IllegalArgumentException("Configured sink is not stdout")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
}
def run(args: List[String]): IO[ExitCode] =
CollectorApp.run()
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
/*
* Copyright (c) 2013-2022 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import org.apache.commons.codec.binary.Base64

class StdoutSink(val maxBytes: Int, streamName: String) extends Sink {

// Print a Base64-encoded event.
override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
streamName match {
case "out" =>
events.foreach { e =>
println(Base64.encodeBase64String(e))
}
case "err" =>
events.foreach { e =>
Console.err.println(Base64.encodeBase64String(e))
}
}

override def shutdown(): Unit = ()
}
///*
// * Copyright (c) 2013-2022 Snowplow Analytics Ltd.
// * All rights reserved.
// *
// * This program is licensed to you under the Apache License Version 2.0,
// * and you may not use this file except in compliance with the Apache
// * License Version 2.0.
// * You may obtain a copy of the Apache License Version 2.0 at
// * http://www.apache.org/licenses/LICENSE-2.0.
// *
// * Unless required by applicable law or agreed to in writing,
// * software distributed under the Apache License Version 2.0 is distributed
// * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// * either express or implied.
// *
// * See the Apache License Version 2.0 for the specific language
// * governing permissions and limitations there under.
// */
//package com.snowplowanalytics.snowplow.collectors.scalastream
//package sinks
//
//import org.apache.commons.codec.binary.Base64
//
//class StdoutSink(val maxBytes: Int, streamName: String) extends Sink {
//
// // Print a Base64-encoded event.
// override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
// streamName match {
// case "out" =>
// events.foreach { e =>
// println(Base64.encodeBase64String(e))
// }
// case "err" =>
// events.foreach { e =>
// Console.err.println(Base64.encodeBase64String(e))
// }
// }
//
// override def shutdown(): Unit = ()
//}
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
/**
* Copyright (c) 2014-2022 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec

class StdoutConfigSpec extends ConfigSpec {
makeConfigTest("stdout", "", "")
}
///**
// * Copyright (c) 2014-2022 Snowplow Analytics Ltd.
// * All rights reserved.
// *
// * This program is licensed to you under the Apache License Version 2.0,
// * and you may not use this file except in compliance with the Apache
// * License Version 2.0.
// * You may obtain a copy of the Apache License Version 2.0 at
// * http://www.apache.org/licenses/LICENSE-2.0.
// *
// * Unless required by applicable law or agreed to in writing,
// * software distributed under the Apache License Version 2.0 is distributed
// * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// * either express or implied.
// *
// * See the Apache License Version 2.0 for the specific language
// * governing permissions and limitations there under.
// */
//package com.snowplowanalytics.snowplow.collectors.scalastream
//
//import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec
//
//class StdoutConfigSpec extends ConfigSpec {
// makeConfigTest("stdout", "", "")
//}

0 comments on commit 9f45a44

Please sign in to comment.