diff --git a/build.sbt b/build.sbt index f908e644c..782591444 100644 --- a/build.sbt +++ b/build.sbt @@ -188,10 +188,12 @@ lazy val kinesisDistroless = project .configs(IntegrationTest) lazy val sqsSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-sqs", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", Docker / packageName := "scala-stream-collector-sqs", libraryDependencies ++= Seq( + Dependencies.Libraries.catsRetry, Dependencies.Libraries.sqs, Dependencies.Libraries.sts, ) @@ -200,14 +202,14 @@ lazy val sqsSettings = lazy val sqs = project .settings(sqsSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val sqsDistroless = project .in(file("distroless/sqs")) .settings(sourceDirectory := (sqs / sourceDirectory).value) .settings(sqsSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val pubsubSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( diff --git a/examples/config.sqs.extended.hocon b/examples/config.sqs.extended.hocon index 7a93f0841..452c09d2c 100644 --- a/examples/config.sqs.extended.hocon +++ b/examples/config.sqs.extended.hocon @@ -189,19 +189,6 @@ collector { # Thread pool size for Kinesis and SQS API requests threadPoolSize = 10 - - # The following are used to authenticate for the Amazon Kinesis and SQS sinks. - # If both are set to 'default', the default provider chain is used - # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) - # If both are set to 'iam', use AWS IAM Roles to provision credentials. - # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY - aws { - accessKey = iam - accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY} - secretKey = iam - secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY} - } - # Optional backoffPolicy { # Minimum backoff period in milliseconds diff --git a/sqs/src/main/resources/application.conf b/sqs/src/main/resources/application.conf index 0c6651bd5..a862f2b43 100644 --- a/sqs/src/main/resources/application.conf +++ b/sqs/src/main/resources/application.conf @@ -4,11 +4,6 @@ collector { enabled = sqs threadPoolSize = 10 - aws { - accessKey = iam - secretKey = iam - } - backoffPolicy { minBackoff = 500 maxBackoff = 1500 @@ -27,25 +22,3 @@ collector { } } } - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index 53c964c40..806fc93e6 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -15,48 +15,32 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import java.util.concurrent.ScheduledThreadPoolExecutor -import cats.syntax.either._ -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService -object SqsCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +import cats.effect.{IO, Resource} - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks: Either[Throwable, CollectorSinks] = for { - sqs <- collectorConf.streams.sink match { - case sqs: Sqs => sqs.asRight - case sink => new IllegalArgumentException(s"Configured sink $sink is not SQS.").asLeft - } - es = new ScheduledThreadPoolExecutor(sqs.threadPoolSize) - goodQueue = collectorConf.streams.good - badQueue = collectorConf.streams.bad - bufferConf = collectorConf.streams.buffer - good <- SqsSink.createAndInitialize( - sqs.maxBytes, - sqs, - bufferConf, - goodQueue, - es +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ + +object SqsCollector extends App[SqsSinkConfig](BuildInfo) { + + override def mkSinks(config: Config.Streams[SqsSinkConfig]): Resource[IO, Sinks[IO]] = { + val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize) + for { + good <- SqsSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.buffer, + config.good, + threadPoolExecutor ) - bad <- SqsSink.createAndInitialize( - sqs.maxBytes, - sqs, - bufferConf, - badQueue, - es + bad <- SqsSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.buffer, + config.bad, + threadPoolExecutor ) - } yield CollectorSinks(good, bad) - - sinks match { - case Right(s) => run(collectorConf, akkaConf, s, telemetry) - case Left(e) => throw e - } + } yield Sinks(good, bad) } } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index b3e388ad8..6f40e5eb2 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -12,6 +12,11 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks +import cats.effect.{Resource, Sync} +import cats.implicits.catsSyntaxMonadErrorRethrow + +import org.slf4j.LoggerFactory + import java.nio.ByteBuffer import java.util.UUID import java.util.concurrent.ScheduledExecutorService @@ -24,29 +29,23 @@ import scala.collection.JavaConverters._ import cats.syntax.either._ -import com.amazonaws.auth.{ - AWSCredentialsProvider, - AWSStaticCredentialsProvider, - BasicAWSCredentials, - DefaultAWSCredentialsProviderChain, - EnvironmentVariableCredentialsProvider, - InstanceProfileCredentialsProvider -} import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry} -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} -class SqsSink private ( +class SqsSink[F[_]: Sync] private ( val maxBytes: Int, client: AmazonSQS, - sqsConfig: Sqs, - bufferConfig: BufferConfig, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService -) extends Sink { +) extends Sink[F] { import SqsSink._ + private lazy val log = LoggerFactory.getLogger(getClass()) + private val ByteThreshold: Long = bufferConfig.byteLimit private val RecordThreshold: Long = bufferConfig.recordLimit private val TimeThreshold: Long = bufferConfig.timeLimit @@ -62,10 +61,10 @@ class SqsSink private ( concurrent.ExecutionContext.fromExecutorService(executorService) @volatile private var sqsHealthy: Boolean = false - override def isHealthy: Boolean = sqsHealthy + override def isHealthy: F[Boolean] = Sync[F].pure(sqsHealthy) - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - events.foreach(e => EventStorage.store(e, key)) + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + Sync[F].delay(events.foreach(e => EventStorage.store(e, key))) object EventStorage { private val storedEvents = ListBuffer.empty[Events] @@ -281,22 +280,39 @@ object SqsSink { // Details about why messages failed to be written to SQS. final case class BatchResultErrorInfo(code: String, message: String) + def create[F[_]: Sync]( + maxBytes: Int, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, + queueName: String, + executorService: ScheduledExecutorService + ): Resource[F, SqsSink[F]] = { + val acquire = + Sync[F] + .delay( + createAndInitialize(maxBytes, sqsConfig, bufferConfig, queueName, executorService) + ) + .rethrow + val release = (sink: SqsSink[F]) => Sync[F].delay(sink.shutdown()) + + Resource.make(acquire)(release) + } + /** * Create an SqsSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the SqsSink * during its construction. */ - def createAndInitialize( + def createAndInitialize[F[_]: Sync]( maxBytes: Int, - sqsConfig: Sqs, - bufferConfig: BufferConfig, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService - ): Either[Throwable, SqsSink] = { - val client = for { - provider <- getProvider(sqsConfig.aws) - client <- createSqsClient(provider, sqsConfig.region) - } yield client + ): Either[Throwable, SqsSink[F]] = { + val client = Either.catchNonFatal( + AmazonSQSClientBuilder.standard().withRegion(sqsConfig.region).build + ) client.map { c => val sqsSink = new SqsSink(maxBytes, c, sqsConfig, bufferConfig, queueName, executorService) @@ -305,35 +321,4 @@ object SqsSink { sqsSink } } - - /** Create an aws credentials provider through env variables and iam. */ - private def getProvider(awsConfig: AWSConfig): Either[Throwable, AWSCredentialsProvider] = { - def isDefault(key: String): Boolean = key == "default" - def isIam(key: String): Boolean = key == "iam" - def isEnv(key: String): Boolean = key == "env" - - ((awsConfig.accessKey, awsConfig.secretKey) match { - case (a, s) if isDefault(a) && isDefault(s) => - new DefaultAWSCredentialsProviderChain().asRight - case (a, s) if isDefault(a) || isDefault(s) => - "accessKey and secretKey must both be set to 'default' or neither".asLeft - case (a, s) if isIam(a) && isIam(s) => - InstanceProfileCredentialsProvider.getInstance().asRight - case (a, s) if isIam(a) && isIam(s) => - "accessKey and secretKey must both be set to 'iam' or neither".asLeft - case (a, s) if isEnv(a) && isEnv(s) => - new EnvironmentVariableCredentialsProvider().asRight - case (a, s) if isEnv(a) || isEnv(s) => - "accessKey and secretKey must both be set to 'env' or neither".asLeft - case _ => - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) - ).asRight - }).leftMap(new IllegalArgumentException(_)) - } - - private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] = - Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build - ) } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala new file mode 100644 index 000000000..7db8b879f --- /dev/null +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala @@ -0,0 +1,22 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +import com.snowplowanalytics.snowplow.collector.core.Config + +final case class SqsSinkConfig( + maxBytes: Int, + region: String, + backoffPolicy: SqsSinkConfig.BackoffPolicyConfig, + threadPoolSize: Int +) extends Config.Sink + +object SqsSinkConfig { + final case class AWSConfig(accessKey: String, secretKey: String) + + final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int) + + implicit val configDecoder: Decoder[SqsSinkConfig] = deriveDecoder[SqsSinkConfig] + implicit val backoffPolicyDecoder: Decoder[BackoffPolicyConfig] = deriveDecoder[BackoffPolicyConfig] +} diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index 690c63d44..b178185fa 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -18,8 +18,114 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification + +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class SqsConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended kinesis config" in { + assert( + resource = "/config.sqs.extended.hocon", + expectedResult = Right(SqsConfigSpec.expectedConfig) + ) + } + "be able to parse minimal kinesis config" in { + assert( + resource = "/config.sqs.minimal.hocon", + expectedResult = Right(SqsConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[SqsSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, SqsSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object SqsConfigSpec { + + private val expectedConfig = Config[SqsSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = SqsSinkConfig( + maxBytes = 192000, + region = "eu-central-1", + backoffPolicy = SqsSinkConfig.BackoffPolicyConfig( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + threadPoolSize = 10 + ) + ) + ) -class SqsConfigSpec extends ConfigSpec { - makeConfigTest("sqs", "", "") }