diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index 8d368135c..e9f8de1fe 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -15,16 +15,16 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.{IO, Resource} -import com.snowplowanalytics.snowplow.collector.core.{App, Config} import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[KinesisSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- KinesisSink.create[IO](config.sink, config.good) - bad <- KinesisSink.create[IO](config.sink, config.bad) + good <- KinesisSink.create[IO](config.sink, config.buffer, config.good) + bad <- KinesisSink.create[IO](config.sink, config.buffer, config.bad) } yield Sinks(good, bad) } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisClient.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisClient.scala new file mode 100644 index 000000000..021697fe1 --- /dev/null +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisClient.scala @@ -0,0 +1,72 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.Parallel +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ +import com.amazonaws.auth._ +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} + +object KinesisClient { + def create[F[_]: Async: Parallel]( + config: KinesisSinkConfig, + streamName: String + ): Resource[F, AmazonKinesis] = + Resource.eval[F, AmazonKinesis](mkProducer(config, streamName)) + + private def mkProducer[F[_]: Sync]( + config: KinesisSinkConfig, + streamName: String + ): F[AmazonKinesis] = + for { + kinesis <- Sync[F].delay(createKinesisClient(config)) + _ <- streamExists(kinesis, streamName) + } yield kinesis + + private def streamExists[F[_]: Sync](kinesis: AmazonKinesis, stream: String): F[Unit] = + for { + described <- Sync[F].delay(kinesis.describeStream(stream)) + status = described.getStreamDescription.getStreamStatus + _ <- status match { + case "ACTIVE" | "UPDATING" => + Sync[F].unit + case _ => + Sync[F].raiseError[Unit](new IllegalArgumentException(s"Stream $stream doesn't exist or can't be accessed")) + } + } yield () + + private def createKinesisClient(config: KinesisSinkConfig): AmazonKinesis = + AmazonKinesisClientBuilder + .standard() + .withCredentials(getProvider(config.aws)) + .withEndpointConfiguration(new EndpointConfiguration(config.endpoint, config.region)) + .build() + + private def getProvider(awsConfig: KinesisSinkConfig.AWSConfig): AWSCredentialsProvider = { + def isDefault(key: String): Boolean = key == "default" + + def isIam(key: String): Boolean = key == "iam" + + def isEnv(key: String): Boolean = key == "env" + + ((awsConfig.accessKey, awsConfig.secretKey) match { + case (a, s) if isDefault(a) && isDefault(s) => + new DefaultAWSCredentialsProviderChain() + case (a, s) if isDefault(a) || isDefault(s) => + throw new IllegalArgumentException("accessKey and secretKey must both be set to 'default' or neither") + case (a, s) if isIam(a) && isIam(s) => + InstanceProfileCredentialsProvider.getInstance() + case (a, s) if isIam(a) && isIam(s) => + throw new IllegalArgumentException("accessKey and secretKey must both be set to 'iam' or neither") + case (a, s) if isEnv(a) && isEnv(s) => + new EnvironmentVariableCredentialsProvider() + case (a, s) if isEnv(a) || isEnv(s) => + throw new IllegalArgumentException("accessKey and secretKey must both be set to 'env' or neither") + case _ => + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) + ) + }) + } + +} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 99cca99f5..a6e913e63 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -1,202 +1,53 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks -import cats.effect.implicits.genSpawnOps -import cats.effect.{Async, Ref, Resource, Sync} +import cats.Parallel +import cats.effect.kernel.Outcome +import cats.effect.std.Queue +import cats.effect.{Async, Resource} import cats.implicits._ -import cats.{Monoid, Parallel} -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration -import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult} -import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} -import com.snowplowanalytics.snowplow.collector.core.Sink -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger -import retry.syntax.all._ - -import java.nio.ByteBuffer -import java.util.UUID -import scala.collection.JavaConverters._ - -class KinesisSink[F[_]: Async: Parallel: Logger] private ( - override val maxBytes: Int, - config: KinesisSinkConfig, - kinesis: AmazonKinesis, - streamName: String -) extends Sink[F] { - override def isHealthy: F[Boolean] = Async[F].pure(true) //TODO - - override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = - writeToKinesis(toKinesisRecords(events)).start.void - - private def writeToKinesis(batch: List[PutRecordsRequestEntry]): F[Unit] = - for { - forNextAttemptBuffer <- Ref.of(batch) - failures <- runAndCaptureFailures(forNextAttemptBuffer).retryingOnFailures( - policy = Retries.fibonacci[F](config.backoffPolicy), - wasSuccessful = failures => Async[F].pure(failures.isEmpty), - onFailure = { - case (result, retryDetails) => - val msg = failureMessageForThrottling(result, streamName) - Logger[F].warn(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") - } - ) - _ <- if (failures.isEmpty) Sync[F].unit - else Sync[F].raiseError(new RuntimeException(failureMessageForThrottling(failures, streamName))) - } yield () - - private def runAndCaptureFailures( - forNextAttemptBuffer: Ref[F, List[PutRecordsRequestEntry]] - ): F[List[PutRecordsRequestEntry]] = - for { - batch <- forNextAttemptBuffer.get - failures <- tryWriteToKinesis(batch) - _ <- forNextAttemptBuffer.set(failures.toList) - } yield failures.toList - - private def tryWriteToKinesis( - records: List[PutRecordsRequestEntry] - ): F[Vector[PutRecordsRequestEntry]] = - Logger[F].debug(s"Writing ${records.size} records to $streamName") *> - Async[F] - .blocking(putRecords(records)) - .map(TryBatchResult.build(records, _)) - .retryingOnFailuresAndAllErrors( - policy = Retries.fullJitter[F](config.backoffPolicy), - wasSuccessful = r => Async[F].pure(!r.shouldRetrySameBatch), - onFailure = { - case (result, retryDetails) => - val msg = failureMessageForInternalErrors(records, streamName, result) - Logger[F].error(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") - }, - onError = (exception, retryDetails) => - Logger[F].error(exception)( - s"Writing ${records.size} records to $streamName errored (${retryDetails.retriesSoFar} retries from cats-retry)" - ) - ) - .flatMap { result => - if (result.shouldRetrySameBatch) - Sync[F].raiseError(new RuntimeException(failureMessageForInternalErrors(records, streamName, result))) - else - result.nextBatchAttempt.pure[F] - } - - private def toKinesisRecords(records: List[Array[Byte]]): List[PutRecordsRequestEntry] = - records.map { r => - val data = ByteBuffer.wrap(r) - val prre = new PutRecordsRequestEntry() - prre.setPartitionKey(UUID.randomUUID().toString) - prre.setData(data) - prre - } - - private case class TryBatchResult( - nextBatchAttempt: Vector[PutRecordsRequestEntry], - hadSuccess: Boolean, - wasThrottled: Boolean, - exampleInternalError: Option[String] - ) { - // Only retry the exact same again if no record was successfully inserted, and all the errors - // were not throughput exceeded exceptions - def shouldRetrySameBatch: Boolean = - !hadSuccess && !wasThrottled - } - - private object TryBatchResult { - - implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] = - new Monoid[TryBatchResult] { - override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, false, None) - - override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult = - TryBatchResult( - x.nextBatchAttempt ++ y.nextBatchAttempt, - x.hadSuccess || y.hadSuccess, - x.wasThrottled || y.wasThrottled, - x.exampleInternalError.orElse(y.exampleInternalError) - ) - } - - def build(records: List[PutRecordsRequestEntry], prr: PutRecordsResult): TryBatchResult = - if (prr.getFailedRecordCount.toInt =!= 0) - records.zip(prr.getRecords.asScala).foldMap { - case (orig, recordResult) => - Option(recordResult.getErrorCode) match { - case None => - TryBatchResult(Vector.empty, true, false, None) - case Some("ProvisionedThroughputExceededException") => - TryBatchResult(Vector(orig), false, true, None) - case Some(_) => - TryBatchResult(Vector(orig), false, false, Option(recordResult.getErrorMessage)) - } - } - else - TryBatchResult(Vector.empty, true, false, None) - } - - private def putRecords(records: List[PutRecordsRequestEntry]): PutRecordsResult = { - val putRecordsRequest = { - val prr = new PutRecordsRequest() - prr.setStreamName(streamName) - prr.setRecords(records.asJava) - prr - } - kinesis.putRecords(putRecordsRequest) - } - - private def failureMessageForInternalErrors( - records: List[PutRecordsRequestEntry], - streamName: String, - result: TryBatchResult - ): String = { - val exampleMessage = result.exampleInternalError.getOrElse("none") - s"Writing ${records.size} records to $streamName errored with internal failures. Example error message [$exampleMessage]" - } - - private def failureMessageForThrottling( - records: List[PutRecordsRequestEntry], - streamName: String - ): String = - s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName." - -} +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} object KinesisSink { - implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = - Slf4jLogger.getLogger[F] + type Event = Array[Byte] def create[F[_]: Async: Parallel]( config: KinesisSinkConfig, + buffer: Config.Buffer, streamName: String ): Resource[F, Sink[F]] = for { - producer <- Resource.eval[F, AmazonKinesis](mkProducer(config, streamName)) - } yield new KinesisSink[F](config.maxBytes, config, producer, streamName) + eventsBuffer <- Resource.eval(Queue.unbounded[F, Option[Event]]) + kinesisClient <- KinesisClient.create(config, streamName) + kinesisWriteOutcome <- WritingToKinesisTask.run[F](config, buffer, streamName, eventsBuffer, kinesisClient) + sink <- Resource.make(createSink(config, eventsBuffer))(stopSink(eventsBuffer, kinesisWriteOutcome)) + } yield sink - private def mkProducer[F[_]: Sync]( + private def createSink[F[_]: Async: Parallel]( config: KinesisSinkConfig, - streamName: String - ): F[AmazonKinesis] = - for { - builder <- Sync[F].delay(AmazonKinesisClientBuilder.standard) - withEndpoint <- config.customEndpoint match { - case Some(endpoint) => - Sync[F].delay(builder.withEndpointConfiguration(new EndpointConfiguration(endpoint, config.region))) - case None => - Sync[F].delay(builder.withRegion(config.region)) + eventsBuffer: Queue[F, Option[Event]] + ): F[Sink[F]] = + Async[F].pure { + new Sink[F] { + override def isHealthy: F[Boolean] = Async[F].pure(true) //TODO + + override def storeRawEvents(events: List[Event], key: String): F[Unit] = + events.parTraverse_ { event => + eventsBuffer.offer(Some(event)) + } + + override val maxBytes: Int = config.maxBytes } - kinesis <- Sync[F].delay(withEndpoint.build()) - _ <- streamExists(kinesis, streamName) - } yield kinesis + } - private def streamExists[F[_]: Sync](kinesis: AmazonKinesis, stream: String): F[Unit] = + private def stopSink[F[_]: Async]( + eventsBuffer: Queue[F, Option[Event]], + kinesisWriteOutcome: F[Outcome[F, Throwable, Unit]] + ): Sink[F] => F[Unit] = { _ => for { - described <- Sync[F].delay(kinesis.describeStream(stream)) - status = described.getStreamDescription.getStreamStatus - _ <- status match { - case "ACTIVE" | "UPDATING" => - Sync[F].unit - case _ => - Sync[F].raiseError[Unit](new IllegalArgumentException(s"Stream $stream doesn't exist or can't be accessed")) - } + _ <- eventsBuffer.offer(None) + _ <- kinesisWriteOutcome } yield () + + } } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/WritingToKinesisTask.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/WritingToKinesisTask.scala new file mode 100644 index 000000000..8a0bcbe86 --- /dev/null +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/WritingToKinesisTask.scala @@ -0,0 +1,232 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.effect.implicits.genSpawnOps +import cats.effect.kernel.{Outcome, Resource} +import cats.effect.std.Queue +import cats.effect.{Async, Ref, Sync} +import cats.implicits._ +import cats.{Monoid, Parallel} +import com.amazonaws.services.kinesis.AmazonKinesis +import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult} +import com.snowplowanalytics.snowplow.collector.core.Config +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.Event +import fs2.Chunk +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry.syntax.all._ + +import java.nio.ByteBuffer +import java.util.UUID +import scala.collection.JavaConverters._ + +object WritingToKinesisTask { + + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def run[F[_]: Async: Parallel]( + config: KinesisSinkConfig, + bufferConfig: Config.Buffer, + streamName: String, + eventsBuffer: Queue[F, Option[Event]], + kinesis: AmazonKinesis + ): Resource[F, F[Outcome[F, Throwable, Unit]]] = + fs2 + .Stream + .fromQueueNoneTerminated(eventsBuffer) + .chunks + .parEvalMapUnbounded(events => writeToKinesis(events, config, streamName, kinesis, bufferConfig)) + .compile + .drain + .background + + // Copied from enrich! + private def writeToKinesis[F[_]: Async: Parallel]( + chunk: Chunk[Event], + config: KinesisSinkConfig, + streamName: String, + kinesis: AmazonKinesis, + bufferConfig: Config.Buffer + ): F[Unit] = + for { + forNextAttemptBuffer <- Ref.of(toKinesisRecords(chunk.toList)) + failures <- runAndCaptureFailures(forNextAttemptBuffer, config, kinesis, bufferConfig, streamName) + .retryingOnFailures( + policy = Retries.fibonacci[F](config.backoffPolicy), + wasSuccessful = failures => Async[F].pure(failures.isEmpty), + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForThrottling(result, streamName) + Logger[F].warn(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + } + ) + _ <- if (failures.isEmpty) Sync[F].unit + else Sync[F].raiseError(new RuntimeException(failureMessageForThrottling(failures, streamName))) + } yield () + + private def runAndCaptureFailures[F[_]: Async: Parallel]( + ref: Ref[F, List[PutRecordsRequestEntry]], + config: KinesisSinkConfig, + kinesis: AmazonKinesis, + bufferConfig: Config.Buffer, + streamName: String + ): F[List[PutRecordsRequestEntry]] = + for { + records <- ref.get + failures <- group(records, bufferConfig.recordLimit, bufferConfig.byteLimit, getRecordSize).parTraverse(g => + tryWriteToKinesis(g, config, streamName, kinesis) + ) + flattened = failures.flatten + _ <- ref.set(flattened) + } yield flattened + + private def getRecordSize(record: PutRecordsRequestEntry) = + record.getData.array.length + record.getPartitionKey.getBytes.length + + private def tryWriteToKinesis[F[_]: Async: Parallel]( + records: List[PutRecordsRequestEntry], + config: KinesisSinkConfig, + streamName: String, + kinesis: AmazonKinesis + ): F[Vector[PutRecordsRequestEntry]] = + Logger[F].debug(s"Writing ${records.size} records to $streamName") *> + Async[F] + .blocking(putRecords(records, streamName, kinesis)) + .map(TryBatchResult.build(records, _)) + .retryingOnFailuresAndAllErrors( + policy = Retries.fullJitter[F](config.backoffPolicy), + wasSuccessful = r => Async[F].pure(!r.shouldRetrySameBatch), + onFailure = { + case (result, retryDetails) => + val msg = failureMessageForInternalErrors(records, streamName, result) + Logger[F].error(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + }, + onError = (exception, retryDetails) => + Logger[F].error(exception)( + s"Writing ${records.size} records to $streamName errored (${retryDetails.retriesSoFar} retries from cats-retry)" + ) + ) + .flatMap { result => + if (result.shouldRetrySameBatch) + Sync[F].raiseError(new RuntimeException(failureMessageForInternalErrors(records, streamName, result))) + else + result.nextBatchAttempt.pure[F] + } + + private def toKinesisRecords(records: List[Event]): List[PutRecordsRequestEntry] = + records.map { r => + val data = ByteBuffer.wrap(r) + val prre = new PutRecordsRequestEntry() + prre.setPartitionKey(UUID.randomUUID().toString) + prre.setData(data) + prre + } + + /** + * This function takes a list of records and splits it into several lists, + * where each list is as big as possible with respecting the record limit and + * the size limit. + */ + private def group[A]( + records: List[A], + recordLimit: Long, + sizeLimit: Long, + getRecordSize: A => Int + ): List[List[A]] = { + case class Batch( + size: Int, + count: Int, + records: List[A] + ) + + records + .foldLeft(List.empty[Batch]) { + case (acc, record) => + val recordSize = getRecordSize(record) + acc match { + case head :: tail => + if (head.count + 1 > recordLimit || head.size + recordSize > sizeLimit) + List(Batch(recordSize, 1, List(record))) ++ List(head) ++ tail + else + List(Batch(head.size + recordSize, head.count + 1, record :: head.records)) ++ tail + case Nil => + List(Batch(recordSize, 1, List(record))) + } + } + .map(_.records) + } + + private case class TryBatchResult( + nextBatchAttempt: Vector[PutRecordsRequestEntry], + hadSuccess: Boolean, + wasThrottled: Boolean, + exampleInternalError: Option[String] + ) { + // Only retry the exact same again if no record was successfully inserted, and all the errors + // were not throughput exceeded exceptions + def shouldRetrySameBatch: Boolean = + !hadSuccess && !wasThrottled + } + + private object TryBatchResult { + + implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] = + new Monoid[TryBatchResult] { + override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, false, None) + + override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult = + TryBatchResult( + x.nextBatchAttempt ++ y.nextBatchAttempt, + x.hadSuccess || y.hadSuccess, + x.wasThrottled || y.wasThrottled, + x.exampleInternalError.orElse(y.exampleInternalError) + ) + } + + def build(records: List[PutRecordsRequestEntry], prr: PutRecordsResult): TryBatchResult = + if (prr.getFailedRecordCount.toInt =!= 0) + records.zip(prr.getRecords.asScala).foldMap { + case (orig, recordResult) => + Option(recordResult.getErrorCode) match { + case None => + TryBatchResult(Vector.empty, true, false, None) + case Some("ProvisionedThroughputExceededException") => + TryBatchResult(Vector(orig), false, true, None) + case Some(_) => + TryBatchResult(Vector(orig), false, false, Option(recordResult.getErrorMessage)) + } + } + else + TryBatchResult(Vector.empty, true, false, None) + } + + private def putRecords( + records: List[PutRecordsRequestEntry], + streamName: String, + kinesis: AmazonKinesis + ): PutRecordsResult = { + val putRecordsRequest = { + val prr = new PutRecordsRequest() + prr.setStreamName(streamName) + prr.setRecords(records.asJava) + prr + } + kinesis.putRecords(putRecordsRequest) + } + + private def failureMessageForInternalErrors( + records: List[PutRecordsRequestEntry], + streamName: String, + result: TryBatchResult + ): String = { + val exampleMessage = result.exampleInternalError.getOrElse("none") + s"Writing ${records.size} records to $streamName errored with internal failures. Example error message [$exampleMessage]" + } + + private def failureMessageForThrottling( + records: List[PutRecordsRequestEntry], + streamName: String + ): String = + s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName." + +} diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala index bf2dbe25d..9a29d64e7 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/ConfigSpec.scala @@ -100,13 +100,13 @@ object ConfigSpec { bad = "bad", useIpAddressAsPartitionKey = false, buffer = Config.Buffer( - byteLimit = 3145728, + byteLimit = 3145728, recordLimit = 500, - timeLimit = 5000 + timeLimit = 5000 ), sink = KinesisSinkConfig( - maxBytes = 1000000, - region = "eu-central-1", + maxBytes = 1000000, + region = "eu-central-1", threadPoolSize = 10, aws = KinesisSinkConfig.AWSConfig( accessKey = "iam", @@ -117,10 +117,10 @@ object ConfigSpec { maxBackoff = 1500, maxRetries = 3 ), - sqsBadBuffer = None, - sqsGoodBuffer = None, - sqsMaxBytes = 192000, - customEndpoint = None, + sqsBadBuffer = None, + sqsGoodBuffer = None, + sqsMaxBytes = 192000, + customEndpoint = None, startupCheckInterval = 1.second ) )