Skip to content

Commit

Permalink
Add PubSub sink
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Aug 21, 2023
1 parent bf461b5 commit f5f9e72
Show file tree
Hide file tree
Showing 25 changed files with 800 additions and 434 deletions.
37 changes: 23 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.akkaStreamTestkit,
Dependencies.Libraries.specs2,
// Integration tests
Dependencies.Libraries.testcontainersIt,
Dependencies.Libraries.http4sClientIt,
Dependencies.Libraries.catsRetryIt
Dependencies.Libraries.LegacyIT.testcontainers,
Dependencies.Libraries.LegacyIT.http4sClient,
Dependencies.Libraries.LegacyIT.catsRetry
)

lazy val commonExclusions = Seq(
Expand Down Expand Up @@ -142,9 +142,17 @@ lazy val http4s = project
Dependencies.Libraries.decline,
Dependencies.Libraries.circeGeneric,
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.specs2CE3
Dependencies.Libraries.specs2,

//Integration tests
Dependencies.Libraries.IT.testcontainers,
Dependencies.Libraries.IT.http4sClient,
Dependencies.Libraries.IT.catsRetry

)
)
.settings(Defaults.itSettings)
.configs(IntegrationTest)

lazy val kinesisSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
Expand All @@ -155,8 +163,8 @@ lazy val kinesisSettings =
Dependencies.Libraries.sts,
Dependencies.Libraries.sqs,
// integration tests dependencies
Dependencies.Libraries.specs2It,
Dependencies.Libraries.specs2CEIt
Dependencies.Libraries.LegacyIT.specs2,
Dependencies.Libraries.LegacyIT.specs2CE
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
Expand Down Expand Up @@ -199,15 +207,16 @@ lazy val sqsDistroless = project
.dependsOn(core % "test->test;compile->compile")

lazy val pubsubSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
moduleName := "snowplow-stream-collector-google-pubsub",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-pubsub",
libraryDependencies ++= Seq(
Dependencies.Libraries.pubsub,
Dependencies.Libraries.protobuf,
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.fs2PubSub,
// integration tests dependencies
Dependencies.Libraries.specs2It,
Dependencies.Libraries.specs2CEIt,
Dependencies.Libraries.IT.specs2,
Dependencies.Libraries.IT.specs2CE,
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
Expand All @@ -216,15 +225,15 @@ lazy val pubsubSettings =
lazy val pubsub = project
.settings(pubsubSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)

lazy val pubsubDistroless = project
.in(file("distroless/pubsub"))
.settings(sourceDirectory := (pubsub / sourceDirectory).value)
.settings(pubsubSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)

lazy val kafkaSettings =
Expand Down Expand Up @@ -272,12 +281,12 @@ lazy val nsqDistroless = project
lazy val stdoutSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-stdout",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout",
Docker / packageName := "scala-stream-collector-stdout"
)

lazy val stdout = project
.settings(stdoutSettings)
.settings(buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(http4s % "test->test;compile->compile")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import org.testcontainers.containers.GenericContainer

case class CollectorContainer(
container: GenericContainer[_],
host: String,
port: Int
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

case class CollectorOutput(
good: List[CollectorPayload],
bad: List[BadRow]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import cats.effect.IO

import org.http4s.{Method, Request, Uri}

object EventGenerator {

def sendEvents(
collectorHost: String,
collectorPort: Int,
nbGood: Int,
nbBad: Int,
maxBytes: Int
): IO[Unit] = {
val requests = generateEvents(collectorHost, collectorPort, nbGood, nbBad, maxBytes)
Http.statuses(requests)
.flatMap { responses =>
responses.collect { case resp if resp.code != 200 => resp.reason } match {
case Nil => IO.unit
case errors => IO.raiseError(new RuntimeException(s"${errors.size} requests were not successful. Example error: ${errors.head}"))
}
}
}

def generateEvents(
collectorHost: String,
collectorPort: Int,
nbGood: Int,
nbBad: Int,
maxBytes: Int
): List[Request[IO]] = {
val good = List.fill(nbGood)(mkTp2Event(collectorHost, collectorPort, valid = true, maxBytes))
val bad = List.fill(nbBad)(mkTp2Event(collectorHost, collectorPort, valid = false, maxBytes))
good ++ bad
}

def mkTp2Event(
collectorHost: String,
collectorPort: Int,
valid: Boolean = true,
maxBytes: Int = 100
): Request[IO] = {
val uri = Uri.unsafeFromString(s"http://$collectorHost:$collectorPort/com.snowplowanalytics.snowplow/tp2")
val body = if (valid) "foo" else "a" * (maxBytes + 1)
Request[IO](Method.POST, uri).withEntity(body)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import cats.effect.{IO, Resource}
import cats.implicits._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
import org.http4s.{Request, Response, Status}

object Http {

def statuses(requests: List[Request[IO]]): IO[List[Status]] =
mkClient.use { client => requests.traverse(client.status) }

def status(request: Request[IO]): IO[Status] =
mkClient.use { client => client.status(request) }

def response(request: Request[IO]): IO[Response[IO]] =
mkClient.use(c => c.run(request).use(resp => IO.pure(resp)))

def responses(requests: List[Request[IO]]): IO[List[Response[IO]]] =
mkClient.use(c => requests.traverse(r => c.run(r).use(resp => IO.pure(resp))))

def mkClient: Resource[IO, Client[IO]] =
BlazeClientBuilder.apply[IO].resource
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import scala.concurrent.duration._

import org.apache.thrift.TDeserializer

import org.slf4j.LoggerFactory

import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer

import io.circe.{Json, parser}

import cats.implicits._

import cats.effect.IO

import retry.syntax.all._
import retry.RetryPolicies

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

object utils {

def parseCollectorPayload(bytes: Array[Byte]): CollectorPayload = {
val deserializer = new TDeserializer()
val target = new CollectorPayload()
deserializer.deserialize(target, bytes)
target
}

def parseBadRow(bytes: Array[Byte]): BadRow = {
val str = new String(bytes)
val parsed = for {
json <- parser.parse(str).leftMap(_.message)
sdj <- SelfDescribingData.parse(json).leftMap(_.message("Can't decode JSON as SDJ"))
br <- sdj.data.as[BadRow].leftMap(_.getMessage())
} yield br
parsed match {
case Right(br) => br
case Left(err) => throw new RuntimeException(s"Can't parse bad row. Error: $err")
}
}

def printBadRows(testName: String, badRows: List[BadRow]): IO[Unit] = {
log(testName, "Bad rows:") *>
badRows.traverse_(br => log(testName, br.compact))
}

def log(testName: String, line: String): IO[Unit] =
IO(println(s"[$testName] $line"))

def startContainerWithLogs(
container: GenericContainer[_],
loggerName: String
): GenericContainer[_] = {
container.start()
val logger = LoggerFactory.getLogger(loggerName)
val logs = new Slf4jLogConsumer(logger)
container.followOutput(logs)
container
}

def waitWhile[A](
a: A,
condition: A => Boolean,
maxDelay: FiniteDuration
): IO[Boolean] = {
val retryPolicy = RetryPolicies.limitRetriesByCumulativeDelay(
maxDelay,
RetryPolicies.capDelay[IO](
2.second,
RetryPolicies.fullJitter[IO](1.second)
)
)

IO(condition(a)).retryingOnFailures(
result => IO(!result),
retryPolicy,
(_, _) => IO.unit
)
}

/** Return a list of config parameters from a raw JSON string. */
def getConfigParameters(config: String): List[String] = {
val parsed: Json = parser.parse(config).valueOr { case failure =>
throw new IllegalArgumentException("Can't parse JSON", failure.underlying)
}

def flatten(value: Json): Option[List[(String, Json)]] =
value.asObject.map(
_.toList.flatMap {
case (k, v) => flatten(v) match {
case None => List(k -> v)
case Some(fields) => fields.map {
case (innerK, innerV) => s"$k.$innerK" -> innerV
}
}
}
)

def withSpaces(s: String): String = if(s.contains(" ")) s""""$s"""" else s

val fields = flatten(parsed).getOrElse(throw new IllegalArgumentException("Couldn't flatten fields"))

fields.flatMap {
case (k, v) if v.isString =>
List(s"-D$k=${withSpaces(v.asString.get)}")
case (k, v) if v.isArray =>
v.asArray.get.toList.zipWithIndex.map {
case (s, i) if s.isString =>
s"-D$k.$i=${withSpaces(s.asString.get)}"
case (other, i) =>
s"-D$k.$i=${withSpaces(other.toString)}"
}
case (k, v) =>
List(s"-D$k=${withSpaces(v.toString)}")
}
}
}
Loading

0 comments on commit f5f9e72

Please sign in to comment.