Skip to content

Commit

Permalink
Wrap sqs sink with effects
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 8, 2023
1 parent 7717492 commit b8f8ff8
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 140 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@ lazy val kinesisDistroless = project
.configs(IntegrationTest)

lazy val sqsSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-sqs",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-sqs",
libraryDependencies ++= Seq(
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.sqs,
Dependencies.Libraries.sts,
)
Expand All @@ -200,14 +202,14 @@ lazy val sqsSettings =
lazy val sqs = project
.settings(sqsSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val sqsDistroless = project
.in(file("distroless/sqs"))
.settings(sourceDirectory := (sqs / sourceDirectory).value)
.settings(sqsSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val pubsubSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
Expand Down
13 changes: 0 additions & 13 deletions examples/config.sqs.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,6 @@ collector {
# Thread pool size for Kinesis and SQS API requests
threadPoolSize = 10


# The following are used to authenticate for the Amazon Kinesis and SQS sinks.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
secretKey = iam
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}

# Optional
backoffPolicy {
# Minimum backoff period in milliseconds
Expand Down
27 changes: 0 additions & 27 deletions sqs/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ collector {
enabled = sqs
threadPoolSize = 10

aws {
accessKey = iam
secretKey = iam
}

backoffPolicy {
minBackoff = 500
maxBackoff = 1500
Expand All @@ -27,25 +22,3 @@ collector {
}
}
}

akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]

http.server {
remote-address-header = on
raw-request-uri-header = on

parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}

max-connections = 2048
}

coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,32 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.concurrent.ScheduledThreadPoolExecutor
import cats.syntax.either._
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService

object SqsCollector extends Collector {
def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion
import cats.effect.{IO, Resource}

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks: Either[Throwable, CollectorSinks] = for {
sqs <- collectorConf.streams.sink match {
case sqs: Sqs => sqs.asRight
case sink => new IllegalArgumentException(s"Configured sink $sink is not SQS.").asLeft
}
es = new ScheduledThreadPoolExecutor(sqs.threadPoolSize)
goodQueue = collectorConf.streams.good
badQueue = collectorConf.streams.bad
bufferConf = collectorConf.streams.buffer
good <- SqsSink.createAndInitialize(
sqs.maxBytes,
sqs,
bufferConf,
goodQueue,
es
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._

object SqsCollector extends App[SqsSinkConfig](BuildInfo) {

override def mkSinks(config: Config.Streams[SqsSinkConfig]): Resource[IO, Sinks[IO]] = {
val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize)
for {
good <- SqsSink.create[IO](
config.sink.maxBytes,
config.sink,
config.buffer,
config.good,
threadPoolExecutor
)
bad <- SqsSink.createAndInitialize(
sqs.maxBytes,
sqs,
bufferConf,
badQueue,
es
bad <- SqsSink.create[IO](
config.sink.maxBytes,
config.sink,
config.buffer,
config.bad,
threadPoolExecutor
)
} yield CollectorSinks(good, bad)

sinks match {
case Right(s) => run(collectorConf, akkaConf, s, telemetry)
case Left(e) => throw e
}
} yield Sinks(good, bad)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import cats.effect.{Resource, Sync}
import cats.implicits.catsSyntaxMonadErrorRethrow

import org.slf4j.LoggerFactory

import java.nio.ByteBuffer
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
Expand All @@ -24,29 +29,23 @@ import scala.collection.JavaConverters._

import cats.syntax.either._

import com.amazonaws.auth.{
AWSCredentialsProvider,
AWSStaticCredentialsProvider,
BasicAWSCredentials,
DefaultAWSCredentialsProviderChain,
EnvironmentVariableCredentialsProvider,
InstanceProfileCredentialsProvider
}
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry}

import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collector.core.{Config, Sink}

class SqsSink private (
class SqsSink[F[_]: Sync] private (
val maxBytes: Int,
client: AmazonSQS,
sqsConfig: Sqs,
bufferConfig: BufferConfig,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
) extends Sink {
) extends Sink[F] {
import SqsSink._

private lazy val log = LoggerFactory.getLogger(getClass())

private val ByteThreshold: Long = bufferConfig.byteLimit
private val RecordThreshold: Long = bufferConfig.recordLimit
private val TimeThreshold: Long = bufferConfig.timeLimit
Expand All @@ -62,10 +61,10 @@ class SqsSink private (
concurrent.ExecutionContext.fromExecutorService(executorService)

@volatile private var sqsHealthy: Boolean = false
override def isHealthy: Boolean = sqsHealthy
override def isHealthy: F[Boolean] = Sync[F].pure(sqsHealthy)

override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
events.foreach(e => EventStorage.store(e, key))
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].delay(events.foreach(e => EventStorage.store(e, key)))

object EventStorage {
private val storedEvents = ListBuffer.empty[Events]
Expand Down Expand Up @@ -281,22 +280,39 @@ object SqsSink {
// Details about why messages failed to be written to SQS.
final case class BatchResultErrorInfo(code: String, message: String)

def create[F[_]: Sync](
maxBytes: Int,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
): Resource[F, SqsSink[F]] = {
val acquire =
Sync[F]
.delay(
createAndInitialize(maxBytes, sqsConfig, bufferConfig, queueName, executorService)
)
.rethrow
val release = (sink: SqsSink[F]) => Sync[F].delay(sink.shutdown())

Resource.make(acquire)(release)
}

/**
* Create an SqsSink and schedule a task to flush its EventStorage.
* Exists so that no threads can get a reference to the SqsSink
* during its construction.
*/
def createAndInitialize(
def createAndInitialize[F[_]: Sync](
maxBytes: Int,
sqsConfig: Sqs,
bufferConfig: BufferConfig,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
): Either[Throwable, SqsSink] = {
val client = for {
provider <- getProvider(sqsConfig.aws)
client <- createSqsClient(provider, sqsConfig.region)
} yield client
): Either[Throwable, SqsSink[F]] = {
val client = Either.catchNonFatal(
AmazonSQSClientBuilder.standard().withRegion(sqsConfig.region).build
)

client.map { c =>
val sqsSink = new SqsSink(maxBytes, c, sqsConfig, bufferConfig, queueName, executorService)
Expand All @@ -305,35 +321,4 @@ object SqsSink {
sqsSink
}
}

/** Create an aws credentials provider through env variables and iam. */
private def getProvider(awsConfig: AWSConfig): Either[Throwable, AWSCredentialsProvider] = {
def isDefault(key: String): Boolean = key == "default"
def isIam(key: String): Boolean = key == "iam"
def isEnv(key: String): Boolean = key == "env"

((awsConfig.accessKey, awsConfig.secretKey) match {
case (a, s) if isDefault(a) && isDefault(s) =>
new DefaultAWSCredentialsProviderChain().asRight
case (a, s) if isDefault(a) || isDefault(s) =>
"accessKey and secretKey must both be set to 'default' or neither".asLeft
case (a, s) if isIam(a) && isIam(s) =>
InstanceProfileCredentialsProvider.getInstance().asRight
case (a, s) if isIam(a) && isIam(s) =>
"accessKey and secretKey must both be set to 'iam' or neither".asLeft
case (a, s) if isEnv(a) && isEnv(s) =>
new EnvironmentVariableCredentialsProvider().asRight
case (a, s) if isEnv(a) || isEnv(s) =>
"accessKey and secretKey must both be set to 'env' or neither".asLeft
case _ =>
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey)
).asRight
}).leftMap(new IllegalArgumentException(_))
}

private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] =
Either.catchNonFatal(
AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import io.circe.Decoder
import io.circe.generic.semiauto._

import com.snowplowanalytics.snowplow.collector.core.Config

final case class SqsSinkConfig(
maxBytes: Int,
region: String,
backoffPolicy: SqsSinkConfig.BackoffPolicyConfig,
threadPoolSize: Int
) extends Config.Sink

object SqsSinkConfig {
final case class AWSConfig(accessKey: String, secretKey: String)

final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int)

implicit val configDecoder: Decoder[SqsSinkConfig] = deriveDecoder[SqsSinkConfig]
implicit val backoffPolicyDecoder: Decoder[BackoffPolicyConfig] = deriveDecoder[BackoffPolicyConfig]
}
Loading

0 comments on commit b8f8ff8

Please sign in to comment.