diff --git a/build.sbt b/build.sbt index 6165766b7..35e17b3c7 100644 --- a/build.sbt +++ b/build.sbt @@ -156,16 +156,18 @@ lazy val http4s = project .configs(IntegrationTest) lazy val kinesisSettings = - allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( moduleName := "snowplow-stream-collector-kinesis", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", Docker / packageName := "scala-stream-collector-kinesis", libraryDependencies ++= Seq( + Dependencies.Libraries.catsRetry, Dependencies.Libraries.kinesis, - Dependencies.Libraries.sts, - Dependencies.Libraries.sqs, +// Dependencies.Libraries.sts, +// Dependencies.Libraries.sqs, // integration tests dependencies - Dependencies.Libraries.LegacyIT.specs2, - Dependencies.Libraries.LegacyIT.specs2CE + Dependencies.Libraries.IT.specs2, + Dependencies.Libraries.IT.specs2CE, ), IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated @@ -174,7 +176,7 @@ lazy val kinesisSettings = lazy val kinesis = project .settings(kinesisSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") + .dependsOn(http4s % "test->test;compile->compile;it->it") .configs(IntegrationTest) lazy val kinesisDistroless = project @@ -182,7 +184,7 @@ lazy val kinesisDistroless = project .settings(sourceDirectory := (kinesis / sourceDirectory).value) .settings(kinesisSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") + .dependsOn(http4s % "test->test;compile->compile;it->it") .configs(IntegrationTest) lazy val sqsSettings = diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala index ac1c1ab27..87e24c0b9 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala @@ -14,20 +14,17 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.core -import scala.concurrent.duration._ - -import cats.effect.testing.specs2.CatsIO - +import cats.effect.IO +import cats.effect.testing.specs2.CatsEffect +import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ import org.http4s.{Header, SameSite} - import org.specs2.mutable.Specification +import org.typelevel.ci.CIStringSyntax -import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http -import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ +import scala.concurrent.duration._ -class CookieSpec extends Specification with Localstack with CatsIO { +class CookieSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes @@ -65,14 +62,14 @@ class CookieSpec extends Specification with Localstack with CatsIO { for { resp <- Http.response(request) - now <- ioTimer.clock.realTime(SECONDS) + now <- IO.realTime } yield { resp.cookies match { case List(cookie) => cookie.name must beEqualTo(name) cookie.expires match { case Some(expiry) => - expiry.epochSecond should beCloseTo(now + expiration.toSeconds, 10) + expiry.epochSecond should beCloseTo((now + expiration).toSeconds, 10) case None => ko(s"Cookie [$cookie] doesn't contain the expiry date") } @@ -134,7 +131,7 @@ class CookieSpec extends Specification with Localstack with CatsIO { additionalConfig = Some(mkConfig()) ).use { collector => val request = EventGenerator.mkTp2Event(collector.host, collector.port) - .withHeaders(Header("SP-Anonymous", "*")) + .withHeaders(Header.Raw(ci"SP-Anonymous", "*")) for { resp <- Http.response(request) @@ -157,7 +154,7 @@ class CookieSpec extends Specification with Localstack with CatsIO { additionalConfig = Some(mkConfig()) ).use { collector => val request = EventGenerator.mkTp2Event(collector.host, collector.port) - .withHeaders(Header("Origin", "http://my.domain")) + .withHeaders(Header.Raw(ci"Origin", "http://my.domain")) for { resp <- Http.response(request) @@ -187,7 +184,7 @@ class CookieSpec extends Specification with Localstack with CatsIO { )) ).use { collector => val request = EventGenerator.mkTp2Event(collector.host, collector.port) - .withHeaders(Header("Origin", s"http://$subDomain")) + .withHeaders(Header.Raw(ci"Origin", s"http://$subDomain")) for { resp <- Http.response(request) @@ -220,7 +217,7 @@ class CookieSpec extends Specification with Localstack with CatsIO { )) ).use { collector => val request1 = EventGenerator.mkTp2Event(collector.host, collector.port) - .withHeaders(Header("Origin", s"http://other.domain")) + .withHeaders(Header.Raw(ci"Origin", s"http://other.domain")) val request2 = EventGenerator.mkTp2Event(collector.host, collector.port) for { diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CustomPathsSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CustomPathsSpec.scala index 78610f161..1bcb6ccfb 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CustomPathsSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CustomPathsSpec.scala @@ -14,21 +14,17 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.core -import scala.concurrent.duration._ - import cats.effect.IO - -import cats.effect.testing.specs2.CatsIO - +import cats.effect.testing.specs2.CatsEffect +import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ +import org.http4s.{Method, Request, Uri} import org.specs2.mutable.Specification -import org.http4s.{Request, Method, Uri} - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis -import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http +import scala.concurrent.duration._ -class CustomPathsSpec extends Specification with Localstack with CatsIO { +class CustomPathsSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala index 03bc31fa2..b2161209b 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala @@ -14,22 +14,17 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.core -import scala.concurrent.duration._ -import scala.collection.JavaConverters._ - import cats.effect.IO - -import cats.effect.testing.specs2.CatsIO - +import cats.effect.testing.specs2.CatsEffect +import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ import org.specs2.mutable.Specification -import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http -import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ -class DoNotTrackCookieSpec extends Specification with Localstack with CatsIO { +class DoNotTrackCookieSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/HealthEndpointSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/HealthEndpointSpec.scala index 12b27a48d..a8095cd3b 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/HealthEndpointSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/HealthEndpointSpec.scala @@ -14,21 +14,17 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.core -import scala.concurrent.duration._ - import cats.effect.IO - -import cats.effect.testing.specs2.CatsIO - +import cats.effect.testing.specs2.CatsEffect +import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ +import org.http4s.{Method, Request, Uri} import org.specs2.mutable.Specification -import org.http4s.{Request, Method, Uri} - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis -import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http +import scala.concurrent.duration._ -class HealthEndpointSpec extends Specification with Localstack with CatsIO { +class HealthEndpointSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/XForwardedForSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/XForwardedForSpec.scala index adba7c2fd..5a743151a 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/XForwardedForSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/XForwardedForSpec.scala @@ -14,27 +14,19 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.core -import java.net.InetAddress - -import scala.concurrent.duration._ - import cats.data.NonEmptyList - import cats.effect.IO - -import cats.effect.testing.specs2.CatsIO - -import org.specs2.mutable.Specification - +import cats.effect.testing.specs2.CatsEffect +import com.comcast.ip4s.IpAddress +import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ import org.http4s.headers.`X-Forwarded-For` +import org.specs2.mutable.Specification -import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http -import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis +import scala.concurrent.duration._ -class XForwardedForSpec extends Specification with Localstack with CatsIO { +class XForwardedForSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes @@ -44,7 +36,7 @@ class XForwardedForSpec extends Specification with Localstack with CatsIO { val streamGood = s"${testName}-raw" val streamBad = s"${testName}-bad-1" - val ip = InetAddress.getByName("123.123.123.123") + val ip = IpAddress.fromString("123.123.123.123") Collector.container( "kinesis/src/it/resources/collector.hocon", @@ -53,7 +45,7 @@ class XForwardedForSpec extends Specification with Localstack with CatsIO { streamBad ).use { collector => val request = EventGenerator.mkTp2Event(collector.host, collector.port) - .withHeaders(`X-Forwarded-For`(NonEmptyList.one(Some(ip)))) + .withHeaders(`X-Forwarded-For`(NonEmptyList.one(ip))) for { _ <- Http.status(request) diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/KinesisCollectorSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/KinesisCollectorSpec.scala index af1878555..c0e7382d0 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/KinesisCollectorSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/KinesisCollectorSpec.scala @@ -14,24 +14,18 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis -import scala.concurrent.duration._ - import cats.effect.IO - -import cats.effect.testing.specs2.CatsIO - -import org.http4s.{Request, Method, Uri, Status} - -import org.specs2.mutable.Specification - -import org.testcontainers.containers.GenericContainer - +import cats.effect.testing.specs2.CatsEffect +import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} +import org.http4s.{Method, Request, Status, Uri} +import org.specs2.mutable.Specification +import org.testcontainers.containers.GenericContainer -import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._ +import scala.concurrent.duration._ -class KinesisCollectorSpec extends Specification with Localstack with CatsIO { +class KinesisCollectorSpec extends Specification with Localstack with CatsEffect { override protected val Timeout = 5.minutes @@ -46,7 +40,7 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO { s"${testName}-raw", s"${testName}-bad-1" ).use { collector => - IO(collector.container.getLogs() must contain(("REST interface bound to"))) + IO(collector.container.getLogs() must contain(("Service bound to address"))) } } @@ -99,7 +93,7 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO { _ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout) } yield { container.isRunning() must beFalse - container.getLogs() must contain("Server terminated") + container.getLogs() must contain("Closing NIO1 channel") } } } diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/containers/Collector.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/containers/Collector.scala index 8408dd6d6..68a127f19 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/containers/Collector.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/containers/Collector.scala @@ -14,17 +14,13 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers -import org.testcontainers.containers.BindMode -import org.testcontainers.containers.wait.strategy.Wait - -import com.dimafeng.testcontainers.GenericContainer - import cats.effect.{IO, Resource} - -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.ProjectMetadata - -import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import com.dimafeng.testcontainers.GenericContainer +import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import org.testcontainers.containers.BindMode +import org.testcontainers.containers.wait.strategy.Wait object Collector { @@ -40,7 +36,7 @@ object Collector { additionalConfig: Option[String] = None ): Resource[IO, CollectorContainer] = { val container = GenericContainer( - dockerImage = s"snowplow/scala-stream-collector-kinesis:${ProjectMetadata.dockerTag}", + dockerImage = BuildInfo.dockerAlias, env = Map( "AWS_ACCESS_KEY_ID" -> "whatever", "AWS_SECRET_ACCESS_KEY" -> "whatever", @@ -50,7 +46,8 @@ object Collector { "REGION" -> Localstack.region, "KINESIS_ENDPOINT" -> Localstack.privateEndpoint, "MAX_BYTES" -> maxBytes.toString, - "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink=warn" + "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink=warn", + "HTTP4S_BACKEND" -> "BLAZE" ) ++ configParameters(additionalConfig), exposedPorts = Seq(port), fileSystemBind = Seq( @@ -64,7 +61,7 @@ object Collector { "--config", "/snowplow/config/collector.hocon" ), - waitStrategy = Wait.forLogMessage(s".*REST interface bound to.*", 1) + waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1) ) container.container.withNetwork(Localstack.network) diff --git a/kinesis/src/main/resources/application.conf b/kinesis/src/main/resources/application.conf index 7331140b0..49ee01e22 100644 --- a/kinesis/src/main/resources/application.conf +++ b/kinesis/src/main/resources/application.conf @@ -1,4 +1,4 @@ -collector { +{ streams { sink { enabled = kinesis @@ -29,26 +29,3 @@ collector { } } - - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index d16d59454..8d368135c 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -14,58 +14,17 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import java.util.concurrent.ScheduledThreadPoolExecutor -import cats.syntax.either._ -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} -object KinesisCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks: Either[Throwable, CollectorSinks] = for { - kc <- collectorConf.streams.sink match { - case kc: Kinesis => kc.asRight - case _ => new IllegalArgumentException("Configured sink is not Kinesis").asLeft - } - es = buildExecutorService(kc) - goodStream = collectorConf.streams.good - badStream = collectorConf.streams.bad - bufferConf = collectorConf.streams.buffer - sqsGood = kc.sqsGoodBuffer - sqsBad = kc.sqsBadBuffer - good <- KinesisSink.createAndInitialize( - kc.maxBytes, - kc, - bufferConf, - goodStream, - sqsGood, - es - ) - bad <- KinesisSink.createAndInitialize( - kc.maxBytes, - kc, - bufferConf, - badStream, - sqsBad, - es - ) - } yield CollectorSinks(good, bad) + override def mkSinks(config: Config.Streams[KinesisSinkConfig]): Resource[IO, Sinks[IO]] = + for { + good <- KinesisSink.create[IO](config.sink, config.good) + bad <- KinesisSink.create[IO](config.sink, config.bad) + } yield Sinks(good, bad) - sinks match { - case Right(s) => run(collectorConf, akkaConf, s, telemetry) - case Left(e) => throw e - } - } - - def buildExecutorService(kc: Kinesis): ScheduledThreadPoolExecutor = { - log.info("Creating thread pool of size " + kc.threadPoolSize) - new ScheduledThreadPoolExecutor(kc.threadPoolSize) - } } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 8c7f4396b..99cca99f5 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -1,554 +1,202 @@ -/* - * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream -package sinks +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks -import java.nio.ByteBuffer -import java.util.concurrent.ScheduledExecutorService -import java.util.UUID - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer -import scala.concurrent.{ExecutionContextExecutorService, Future} -import scala.concurrent.duration._ -import scala.util.{Failure, Success, Try} - -import cats.syntax.either._ - -import com.amazonaws.auth._ +import cats.effect.implicits.genSpawnOps +import cats.effect.{Async, Ref, Resource, Sync} +import cats.implicits._ +import cats.{Monoid, Parallel} import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult} import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} -import com.amazonaws.services.kinesis.model._ -import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} -import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry} - -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.SqsClientAndName - -class KinesisSink private ( - val maxBytes: Int, - client: AmazonKinesis, - kinesisConfig: Kinesis, - bufferConfig: BufferConfig, - streamName: String, - executorService: ScheduledExecutorService, - maybeSqs: Option[SqsClientAndName] -) extends Sink { - import KinesisSink._ - - maybeSqs match { - case Some(sqs) => - log.info(s"SQS buffer for Kinesis stream $streamName is defined with name ${sqs.sqsBufferName}") - case None => - log.warn( - s"No SQS buffer for surge protection set up for stream $streamName (consider setting it via the config file)" - ) - } - - private val ByteThreshold = bufferConfig.byteLimit - private val RecordThreshold = bufferConfig.recordLimit - private val TimeThreshold = bufferConfig.timeLimit +import com.snowplowanalytics.snowplow.collector.core.Sink +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry.syntax.all._ - private val maxBackoff = kinesisConfig.backoffPolicy.maxBackoff - private val minBackoff = kinesisConfig.backoffPolicy.minBackoff - private val maxRetries = kinesisConfig.backoffPolicy.maxRetries - private val randomGenerator = new java.util.Random() - - private val MaxSqsBatchSizeN = 10 - - implicit lazy val ec: ExecutionContextExecutorService = - concurrent.ExecutionContext.fromExecutorService(executorService) - - @volatile private var kinesisHealthy: Boolean = false - @volatile private var sqsHealthy: Boolean = false - override def isHealthy: Boolean = kinesisHealthy || sqsHealthy - - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - events.foreach(e => EventStorage.store(e, key)) - - object EventStorage { - private val storedEvents = ListBuffer.empty[Events] - private var byteCount = 0L - @volatile private var lastFlushedTime = 0L - - def store(event: Array[Byte], key: String): Unit = { - val eventBytes = ByteBuffer.wrap(event) - val eventSize = eventBytes.capacity +import java.nio.ByteBuffer +import java.util.UUID +import scala.collection.JavaConverters._ - synchronized { - if (storedEvents.size + 1 > RecordThreshold || byteCount + eventSize > ByteThreshold) { - flush() +class KinesisSink[F[_]: Async: Parallel: Logger] private ( + override val maxBytes: Int, + config: KinesisSinkConfig, + kinesis: AmazonKinesis, + streamName: String +) extends Sink[F] { + override def isHealthy: F[Boolean] = Async[F].pure(true) //TODO + + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + writeToKinesis(toKinesisRecords(events)).start.void + + private def writeToKinesis(batch: List[PutRecordsRequestEntry]): F[Unit] = + for { + forNextAttemptBuffer <- Ref.of(batch) + failures <- runAndCaptureFailures(forNextAttemptBuffer).retryingOnFailures( + policy = Retries.fibonacci[F](config.backoffPolicy), + wasSuccessful = failures => Async[F].pure(failures.isEmpty), + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForThrottling(result, streamName) + Logger[F].warn(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") } - storedEvents += Events(eventBytes.array(), key) - byteCount += eventSize - } - } - - def flush(): Unit = { - val eventsToSend = synchronized { - val evts = storedEvents.result - storedEvents.clear() - byteCount = 0 - evts - } - lastFlushedTime = System.currentTimeMillis() - sinkBatch(eventsToSend) - } - - def getLastFlushTime: Long = lastFlushedTime - - /** - * Recursively schedule a task to send everything in EventStorage. - * Even if the incoming event flow dries up, all stored events will eventually get sent. - * Whenever TimeThreshold milliseconds have passed since the last call to flush, call flush. - * @param interval When to schedule the next flush - */ - def scheduleFlush(interval: Long = TimeThreshold): Unit = { - executorService.schedule( - new Runnable { - override def run(): Unit = { - val lastFlushed = getLastFlushTime - val currentTime = System.currentTimeMillis() - if (currentTime - lastFlushed >= TimeThreshold) { - flush() - scheduleFlush(TimeThreshold) - } else { - scheduleFlush(TimeThreshold + lastFlushed - currentTime) - } - } - }, - interval, - MILLISECONDS ) - () - } - } - - def sinkBatch(batch: List[Events]): Unit = - if (batch.nonEmpty) maybeSqs match { - // Kinesis healthy - case _ if kinesisHealthy => - writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) - // No SQS buffer - case None => - writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) - // Kinesis not healthy and SQS buffer defined - case Some(sqs) => - writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries) - } - - def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = { - log.info(s"Writing ${batch.size} records to Kinesis stream $streamName") - writeBatchToKinesis(batch).onComplete { - case Success(s) => - kinesisHealthy = true - val results = s.getRecords.asScala.toList - val failurePairs = batch.zip(results).filter(_._2.getErrorMessage != null) - log.info( - s"Successfully wrote ${batch.size - failurePairs.size} out of ${batch.size} records to Kinesis stream $streamName" + _ <- if (failures.isEmpty) Sync[F].unit + else Sync[F].raiseError(new RuntimeException(failureMessageForThrottling(failures, streamName))) + } yield () + + private def runAndCaptureFailures( + forNextAttemptBuffer: Ref[F, List[PutRecordsRequestEntry]] + ): F[List[PutRecordsRequestEntry]] = + for { + batch <- forNextAttemptBuffer.get + failures <- tryWriteToKinesis(batch) + _ <- forNextAttemptBuffer.set(failures.toList) + } yield failures.toList + + private def tryWriteToKinesis( + records: List[PutRecordsRequestEntry] + ): F[Vector[PutRecordsRequestEntry]] = + Logger[F].debug(s"Writing ${records.size} records to $streamName") *> + Async[F] + .blocking(putRecords(records)) + .map(TryBatchResult.build(records, _)) + .retryingOnFailuresAndAllErrors( + policy = Retries.fullJitter[F](config.backoffPolicy), + wasSuccessful = r => Async[F].pure(!r.shouldRetrySameBatch), + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForInternalErrors(records, streamName, result) + Logger[F].error(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + }, + onError = (exception, retryDetails) => + Logger[F].error(exception)( + s"Writing ${records.size} records to $streamName errored (${retryDetails.retriesSoFar} retries from cats-retry)" + ) ) - if (failurePairs.nonEmpty) { - failurePairs.groupBy(_._2.getErrorCode).foreach { - case (errorCode, items) => - val exampleMsg = items.map(_._2.getErrorMessage).find(_.nonEmpty).getOrElse("") - log.error( - s"Writing ${items.size} records to Kinesis stream $streamName failed with error code [$errorCode] and example message: $exampleMsg" - ) - } - val failedRecords = failurePairs.map(_._1) - handleKinesisError(failedRecords, nextBackoff, retriesLeft) + .flatMap { result => + if (result.shouldRetrySameBatch) + Sync[F].raiseError(new RuntimeException(failureMessageForInternalErrors(records, streamName, result))) + else + result.nextBatchAttempt.pure[F] } - case Failure(f) => - log.error(s"Writing ${batch.size} records to Kinesis stream $streamName failed with error: ${f.getMessage()}") - handleKinesisError(batch, nextBackoff, retriesLeft) - } - } - def writeBatchToSqsWithRetries( - batch: List[Events], - sqs: SqsClientAndName, - nextBackoff: Long, - retriesLeft: Int - ): Unit = { - log.info(s"Writing ${batch.size} records to SQS buffer ${sqs.sqsBufferName}") - writeBatchToSqs(batch, sqs).onComplete { - case Success(s) => - sqsHealthy = true - log.info( - s"Successfully wrote ${batch.size - s.size} out of ${batch.size} records to SQS buffer ${sqs.sqsBufferName}" - ) - if (s.nonEmpty) { - s.groupBy(_._2.code).foreach { - case (errorCode, items) => - val exampleMsg = items.map(_._2.message).find(_.nonEmpty).getOrElse("") - log.error( - s"Writing ${items.size} records to SQS buffer ${sqs.sqsBufferName} failed with error code [$errorCode] and example message: $exampleMsg" - ) - } - val failedRecords = s.map(_._1) - handleSqsError(failedRecords, sqs, nextBackoff, retriesLeft) - } - case Failure(f) => - log.error( - s"Writing ${batch.size} records to SQS buffer ${sqs.sqsBufferName} failed with error: ${f.getMessage()}" - ) - handleSqsError(batch, sqs, nextBackoff, retriesLeft) + private def toKinesisRecords(records: List[Array[Byte]]): List[PutRecordsRequestEntry] = + records.map { r => + val data = ByteBuffer.wrap(r) + val prre = new PutRecordsRequestEntry() + prre.setPartitionKey(UUID.randomUUID().toString) + prre.setData(data) + prre } - } - def handleKinesisError(failedRecords: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = - if (retriesLeft > 0) { - log.error( - s"$retriesLeft retries left. Retrying to write ${failedRecords.size} records to Kinesis stream $streamName in $nextBackoff milliseconds" - ) - scheduleRetryToKinesis(failedRecords, nextBackoff, retriesLeft - 1) - } else { - kinesisHealthy = false - log.error(s"Maximum number of retries reached for Kinesis stream $streamName for ${failedRecords.size} records") - maybeSqs match { - case Some(sqs) => - log.error( - s"SQS buffer ${sqs.sqsBufferName} defined for stream $streamName. Retrying to send the events to SQS" - ) - writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries) - case None => - log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis") - writeBatchToKinesisWithRetries(failedRecords, maxBackoff, maxRetries) - } - } - - def handleSqsError(failedRecords: List[Events], sqs: SqsClientAndName, nextBackoff: Long, retriesLeft: Int): Unit = - if (retriesLeft > 0) { - log.error( - s"$retriesLeft retries left. Retrying to write ${failedRecords.size} records to SQS buffer ${sqs.sqsBufferName} in $nextBackoff milliseconds" - ) - scheduleRetryToSqs(failedRecords, sqs, nextBackoff, retriesLeft - 1) - } else { - sqsHealthy = false - log.error( - s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis" - ) - writeBatchToKinesisWithRetries(failedRecords, minBackoff, maxRetries) - } - - def writeBatchToKinesis(batch: List[Events]): Future[PutRecordsResult] = - Future { - val putRecordsRequest = { - val prr = new PutRecordsRequest() - prr.setStreamName(streamName) - val putRecordsRequestEntryList = batch.map { event => - val prre = new PutRecordsRequestEntry() - prre.setPartitionKey(event.key) - prre.setData(ByteBuffer.wrap(event.payloads)) - prre - } - prr.setRecords(putRecordsRequestEntryList.asJava) - prr - } - client.putRecords(putRecordsRequest) - } - - /** - * @return Empty list if all events were successfully inserted; - * otherwise a non-empty list of Events to be retried and the reasons why they failed. - */ - def writeBatchToSqs(batch: List[Events], sqs: SqsClientAndName): Future[List[(Events, BatchResultErrorInfo)]] = - Future { - val splitBatch = split(batch, getByteSize, MaxSqsBatchSizeN, maxBytes) - splitBatch.map(toSqsMessages).flatMap { msgGroup => - val entries = msgGroup.map(_._2) - val batchRequest = - new SendMessageBatchRequest().withQueueUrl(sqs.sqsBufferName).withEntries(entries.asJava) - val response = sqs.sqsClient.sendMessageBatch(batchRequest) - val failures = response - .getFailed - .asScala - .toList - .map { bree => - (bree.getId, BatchResultErrorInfo(bree.getCode, bree.getMessage)) - } - .toMap - // Events to retry and reasons for failure - msgGroup.collect { - case (e, m) if failures.contains(m.getId) => - (e, failures(m.getId)) - } - } - } - - def toSqsMessages(events: List[Events]): List[(Events, SendMessageBatchRequestEntry)] = - events.map(e => - ( - e, - new SendMessageBatchRequestEntry(UUID.randomUUID.toString, b64Encode(e.payloads)).withMessageAttributes( - Map( - "kinesisKey" -> - new MessageAttributeValue().withDataType("String").withStringValue(e.key) - ).asJava - ) - ) - ) - - def b64Encode(msg: Array[Byte]): String = { - val buffer = java.util.Base64.getEncoder.encode(msg) - new String(buffer) - } - - def scheduleRetryToKinesis(failedRecords: List[Events], currentBackoff: Long, retriesLeft: Int): Unit = { - val nextBackoff = getNextBackoff(currentBackoff) - executorService.schedule( - new Runnable { - override def run(): Unit = writeBatchToKinesisWithRetries(failedRecords, nextBackoff, retriesLeft) - }, - currentBackoff, - MILLISECONDS - ) - () + private case class TryBatchResult( + nextBatchAttempt: Vector[PutRecordsRequestEntry], + hadSuccess: Boolean, + wasThrottled: Boolean, + exampleInternalError: Option[String] + ) { + // Only retry the exact same again if no record was successfully inserted, and all the errors + // were not throughput exceeded exceptions + def shouldRetrySameBatch: Boolean = + !hadSuccess && !wasThrottled } - def scheduleRetryToSqs( - failedRecords: List[Events], - sqs: SqsClientAndName, - currentBackoff: Long, - retriesLeft: Int - ): Unit = { - val nextBackoff = getNextBackoff(currentBackoff) - executorService.schedule( - new Runnable { - override def run(): Unit = writeBatchToSqsWithRetries(failedRecords, sqs, nextBackoff, retriesLeft) - }, - currentBackoff, - MILLISECONDS - ) - () - } + private object TryBatchResult { - /** - * How long to wait before sending the next request - * @param lastBackoff The previous backoff time - * @return Maximum of two-thirds of lastBackoff and a random number between minBackoff and maxBackoff - */ - private def getNextBackoff(lastBackoff: Long): Long = { - val diff = (maxBackoff - minBackoff + 1).toInt - (minBackoff + randomGenerator.nextInt(diff)).max(lastBackoff / 3 * 2) - } + implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] = + new Monoid[TryBatchResult] { + override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, false, None) - def shutdown(): Unit = { - EventStorage.flush() - executorService.shutdown() - executorService.awaitTermination(10000, MILLISECONDS) - () - } - - private def checkKinesisHealth(): Unit = { - val healthRunnable = new Runnable { - override def run() { - while (!kinesisHealthy) { - Try { - val describeRequest = new DescribeStreamSummaryRequest() - describeRequest.setStreamName(streamName) - val describeResult = client.describeStreamSummary(describeRequest) - describeResult.getStreamDescriptionSummary().getStreamStatus() - } match { - case Success("ACTIVE") => - log.info(s"Stream $streamName ACTIVE") - kinesisHealthy = true - case Success(other) => - log.warn(s"Stream $streamName not ACTIVE but $other") - Thread.sleep(kinesisConfig.startupCheckInterval.toMillis) - case Failure(err) => - log.error(s"Error while checking status of stream $streamName: ${err.getMessage()}") - Thread.sleep(kinesisConfig.startupCheckInterval.toMillis) - } - } + override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult = + TryBatchResult( + x.nextBatchAttempt ++ y.nextBatchAttempt, + x.hadSuccess || y.hadSuccess, + x.wasThrottled || y.wasThrottled, + x.exampleInternalError.orElse(y.exampleInternalError) + ) } - } - executorService.execute(healthRunnable) - } - private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs => - val healthRunnable = new Runnable { - override def run() { - while (!sqsHealthy) { - Try { - sqs.sqsClient.getQueueUrl(sqs.sqsBufferName) - } match { - case Success(_) => - log.info(s"SQS buffer ${sqs.sqsBufferName} exists") - sqsHealthy = true - case Failure(err) => - log.error(s"SQS buffer ${sqs.sqsBufferName} doesn't exist. Error: ${err.getMessage()}") - } - Thread.sleep(kinesisConfig.startupCheckInterval.toMillis) + def build(records: List[PutRecordsRequestEntry], prr: PutRecordsResult): TryBatchResult = + if (prr.getFailedRecordCount.toInt =!= 0) + records.zip(prr.getRecords.asScala).foldMap { + case (orig, recordResult) => + Option(recordResult.getErrorCode) match { + case None => + TryBatchResult(Vector.empty, true, false, None) + case Some("ProvisionedThroughputExceededException") => + TryBatchResult(Vector(orig), false, true, None) + case Some(_) => + TryBatchResult(Vector(orig), false, false, Option(recordResult.getErrorMessage)) + } } - } - } - executorService.execute(healthRunnable) + else + TryBatchResult(Vector.empty, true, false, None) } -} - -/** KinesisSink companion object with factory method */ -object KinesisSink { - - sealed trait Target - final case object Kinesis extends Target - final case class Sqs(sqs: SqsClientAndName) extends Target - - /** - * Events to be written to Kinesis or SQS. - * @param payloads Serialized events extracted from a CollectorPayload. - * The size of this collection is limited by MaxBytes. - * Not to be confused with a 'batch' events to sink. - * @param key Partition key for Kinesis - */ - final case class Events(payloads: Array[Byte], key: String) - // Details about why messages failed to be written to SQS. - final case class BatchResultErrorInfo(code: String, message: String) - - final case class SqsClientAndName(sqsClient: AmazonSQS, sqsBufferName: String) - - /** - * Create a KinesisSink and schedule a task to flush its EventStorage. - * Exists so that no threads can get a reference to the KinesisSink - * during its construction. - */ - def createAndInitialize( - kinesisMaxBytes: Int, - kinesisConfig: Kinesis, - bufferConfig: BufferConfig, - streamName: String, - sqsBufferName: Option[String], - executorService: ScheduledExecutorService - ): Either[Throwable, KinesisSink] = { - val clients = for { - provider <- getProvider(kinesisConfig.aws) - kinesisClient <- createKinesisClient(provider, kinesisConfig.endpoint, kinesisConfig.region) - sqsClientAndName <- sqsBuffer(sqsBufferName, provider, kinesisConfig.region) - } yield (kinesisClient, sqsClientAndName) - - clients.map { - case (kinesisClient, sqsClientAndName) => - val maxBytes = - if (sqsClientAndName.isDefined) kinesisConfig.sqsMaxBytes else kinesisMaxBytes - val ks = - new KinesisSink( - maxBytes, - kinesisClient, - kinesisConfig, - bufferConfig, - streamName, - executorService, - sqsClientAndName - ) - ks.checkKinesisHealth() - ks.checkSqsHealth() - ks.EventStorage.scheduleFlush() - ks + private def putRecords(records: List[PutRecordsRequestEntry]): PutRecordsResult = { + val putRecordsRequest = { + val prr = new PutRecordsRequest() + prr.setStreamName(streamName) + prr.setRecords(records.asJava) + prr } + kinesis.putRecords(putRecordsRequest) } - /** Create an aws credentials provider through env variables and iam. */ - private def getProvider(awsConfig: AWSConfig): Either[Throwable, AWSCredentialsProvider] = { - def isDefault(key: String): Boolean = key == "default" - def isIam(key: String): Boolean = key == "iam" - def isEnv(key: String): Boolean = key == "env" - - ((awsConfig.accessKey, awsConfig.secretKey) match { - case (a, s) if isDefault(a) && isDefault(s) => - new DefaultAWSCredentialsProviderChain().asRight - case (a, s) if isDefault(a) || isDefault(s) => - "accessKey and secretKey must both be set to 'default' or neither".asLeft - case (a, s) if isIam(a) && isIam(s) => - InstanceProfileCredentialsProvider.getInstance().asRight - case (a, s) if isIam(a) && isIam(s) => - "accessKey and secretKey must both be set to 'iam' or neither".asLeft - case (a, s) if isEnv(a) && isEnv(s) => - new EnvironmentVariableCredentialsProvider().asRight - case (a, s) if isEnv(a) || isEnv(s) => - "accessKey and secretKey must both be set to 'env' or neither".asLeft - case _ => - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) - ).asRight - }).leftMap(new IllegalArgumentException(_)) + private def failureMessageForInternalErrors( + records: List[PutRecordsRequestEntry], + streamName: String, + result: TryBatchResult + ): String = { + val exampleMessage = result.exampleInternalError.getOrElse("none") + s"Writing ${records.size} records to $streamName errored with internal failures. Example error message [$exampleMessage]" } - /** - * Creates a new Kinesis client. - * @param provider aws credentials provider - * @param endpoint kinesis endpoint where the stream resides - * @param region aws region where the stream resides - * @return the initialized AmazonKinesisClient - */ - private def createKinesisClient( - provider: AWSCredentialsProvider, - endpoint: String, - region: String - ): Either[Throwable, AmazonKinesis] = - Either.catchNonFatal( - AmazonKinesisClientBuilder - .standard() - .withCredentials(provider) - .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) - .build() - ) + private def failureMessageForThrottling( + records: List[PutRecordsRequestEntry], + streamName: String + ): String = + s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName." - private def sqsBuffer( - sqsBufferName: Option[String], - provider: AWSCredentialsProvider, - region: String - ): Either[Throwable, Option[SqsClientAndName]] = - sqsBufferName match { - case Some(name) => - createSqsClient(provider, region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name))) - case None => None.asRight - } +} - private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] = - Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build - ) +object KinesisSink { - /** - * Splits a Kinesis-sized batch of `Events` into smaller batches that meet the SQS limit. - * @param batch A batch of up to `KinesisLimit` that must be split into smaller batches. - * @param getByteSize How to get the size of a batch. - * @param maxRecords Max records for the smaller batches. - * @param maxBytes Max byte size for the smaller batches. - * @return A batch of smaller batches, each one of which meets the limits. - */ - def split( - batch: List[Events], - getByteSize: Events => Int, - maxRecords: Int, - maxBytes: Int - ): List[List[Events]] = { - var bytes = 0L - @scala.annotation.tailrec - def go(originalBatch: List[Events], tmpBatch: List[Events], newBatch: List[List[Events]]): List[List[Events]] = - (originalBatch, tmpBatch) match { - case (Nil, Nil) => newBatch - case (Nil, acc) => acc :: newBatch - case (h :: t, acc) if acc.size + 1 > maxRecords || getByteSize(h) + bytes > maxBytes => - bytes = getByteSize(h).toLong - go(t, h :: Nil, acc :: newBatch) - case (h :: t, acc) => - bytes += getByteSize(h) - go(t, h :: acc, newBatch) + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def create[F[_]: Async: Parallel]( + config: KinesisSinkConfig, + streamName: String + ): Resource[F, Sink[F]] = + for { + producer <- Resource.eval[F, AmazonKinesis](mkProducer(config, streamName)) + } yield new KinesisSink[F](config.maxBytes, config, producer, streamName) + + private def mkProducer[F[_]: Sync]( + config: KinesisSinkConfig, + streamName: String + ): F[AmazonKinesis] = + for { + builder <- Sync[F].delay(AmazonKinesisClientBuilder.standard) + withEndpoint <- config.customEndpoint match { + case Some(endpoint) => + Sync[F].delay(builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, config.region))) + case None => + Sync[F].delay(builder.withRegion(config.region)) } - go(batch, Nil, Nil).map(_.reverse).reverse.filter(_.nonEmpty) - } - - def getByteSize(events: Events): Int = ByteBuffer.wrap(events.payloads).capacity + kinesis <- Sync[F].delay(withEndpoint.build()) + _ <- streamExists(kinesis, streamName) + } yield kinesis + + private def streamExists[F[_]: Sync](kinesis: AmazonKinesis, stream: String): F[Unit] = + for { + described <- Sync[F].delay(kinesis.describeStream(stream)) + status = described.getStreamDescription.getStreamStatus + _ <- status match { + case "ACTIVE" | "UPDATING" => + Sync[F].unit + case _ => + Sync[F].raiseError[Unit](new IllegalArgumentException(s"Stream $stream doesn't exist or can't be accessed")) + } + } yield () } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala new file mode 100644 index 000000000..9942b0768 --- /dev/null +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala @@ -0,0 +1,37 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import com.snowplowanalytics.snowplow.collector.core.Config +import io.circe.Decoder +import io.circe.generic.semiauto._ +import io.circe.config.syntax.durationDecoder + +import scala.concurrent.duration.FiniteDuration + +final case class KinesisSinkConfig( + maxBytes: Int, + region: String, + threadPoolSize: Int, + aws: KinesisSinkConfig.AWSConfig, + backoffPolicy: KinesisSinkConfig.BackoffPolicy, + customEndpoint: Option[String], + sqsGoodBuffer: Option[String], + sqsBadBuffer: Option[String], + sqsMaxBytes: Int, + startupCheckInterval: FiniteDuration +) extends Config.Sink { + val endpoint = customEndpoint.getOrElse(region match { + case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn" + case cn @ "cn-northwest-1" => s"https://kinesis.$cn.amazonaws.com.cn" + case _ => s"https://kinesis.$region.amazonaws.com" + }) +} + +object KinesisSinkConfig { + final case class AWSConfig(accessKey: String, secretKey: String) + + final case class BackoffPolicy(minBackoff: Long, maxBackoff: Long, maxRetries: Int) + implicit val configDecoder: Decoder[KinesisSinkConfig] = deriveDecoder[KinesisSinkConfig] + implicit val awsConfigDecoder: Decoder[AWSConfig] = deriveDecoder[AWSConfig] + implicit val backoffPolicyConfigDecoder: Decoder[BackoffPolicy] = + deriveDecoder[BackoffPolicy] +} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Retries.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Retries.scala new file mode 100644 index 000000000..1ade2df94 --- /dev/null +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Retries.scala @@ -0,0 +1,27 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.Applicative +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSinkConfig.BackoffPolicy +import retry.{RetryPolicies, RetryPolicy} + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +object Retries { + + def fullJitter[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + capBackoffAndRetries(config, RetryPolicies.fullJitter[F](FiniteDuration(config.minBackoff, TimeUnit.MILLISECONDS))) + + def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + capBackoffAndRetries( + config, + RetryPolicies.fibonacciBackoff[F](FiniteDuration(config.minBackoff, TimeUnit.MILLISECONDS)) + ) + + private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = { + val capped = RetryPolicies.capDelay[F](FiniteDuration(config.maxBackoff, TimeUnit.MILLISECONDS), policy) + val max = RetryPolicies.limitRetries(config.maxRetries) + capped.join(max) + } + +} diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala new file mode 100644 index 000000000..bf2dbe25d --- /dev/null +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2012-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification + +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class ConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended kinesis config" in { + assert( + resource = "/config.kinesis.extended.hocon", + expectedResult = Right(ConfigSpec.expectedConfig) + ) + } + "be able to parse minimal kinesis config" in { + assert( + resource = "/config.kinesis.minimal.hocon", + expectedResult = Right(ConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[KinesisSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, KinesisSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object ConfigSpec { + + private val expectedConfig = Config[KinesisSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = KinesisSinkConfig( + maxBytes = 1000000, + region = "eu-central-1", + threadPoolSize = 10, + aws = KinesisSinkConfig.AWSConfig( + accessKey = "iam", + secretKey = "iam" + ), + backoffPolicy = KinesisSinkConfig.BackoffPolicy( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + sqsBadBuffer = None, + sqsGoodBuffer = None, + sqsMaxBytes = 192000, + customEndpoint = None, + startupCheckInterval = 1.second + ) + ) + ) + +} diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala deleted file mode 100644 index 03f8e0be3..000000000 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (c) 2014-2022 Snowplow Analytics Ltd. - * All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache - * License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. - * - * See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream.sinks - -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec - -class KinesisConfigSpec extends ConfigSpec { - makeConfigTest("kinesis", "", "") -} diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala deleted file mode 100644 index 05cbc016a..000000000 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream -package sinks - -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink._ - -import org.specs2.mutable.Specification - -class KinesisSinkSpec extends Specification { - val event = Events("a".getBytes, "b") - - "KinesisSink.split" should { - "return empty list if given an empty batch" in { - val emptyBatch = List.empty[Events] - - split(emptyBatch, getByteSize, 1, 10) mustEqual List.empty - split(emptyBatch, getByteSize, 10, 1) mustEqual List.empty - // Edge case that we shouldn't hit. The test simply confirms the behaviour. - split(emptyBatch, getByteSize, 0, 0) mustEqual List.empty - } - - "correctly split batches, according to maxRecords setting" in { - val batch1 = List.fill(10)(event) - val batch2 = List.fill(1)(event) - - val res1 = split(batch1, getByteSize, 3, 1000) - val res2 = split(batch2, getByteSize, 3, 1000) - // Edge case that we shouldn't hit. The test simply confirms the behaviour. - val res3 = split(batch1, getByteSize, 0, 1000) - - res1.length mustEqual 4 - res2.length mustEqual 1 - (res3.length mustEqual 10).and(res3.forall(_ must not be empty)) - } - - "correctly split batches, according to maxBytes setting" in { - val batch1 = List.fill(10)(event) - val batch2 = List.fill(1)(event) - - val res1 = split(batch1, getByteSize, 1000, 3) - val res2 = split(batch2, getByteSize, 1000, 3) - // Edge case that we shouldn't hit. The test simply confirms the behaviour. - val res3 = split(batch1, getByteSize, 1000, 0) - - res1.length mustEqual 4 - res2.length mustEqual 1 - (res3.length mustEqual 10).and(res3.forall(_ must not be empty)) - } - } -}