From 6fe5371df360e40fe7b9c4c031fb4e6d7fee1e94 Mon Sep 17 00:00:00 2001 From: markglh Date: Mon, 11 Sep 2017 14:22:39 +0100 Subject: [PATCH] Feature/issue28 test kpl properties (#30) * Upgrading KPL/KCL libraries, replacing deprecated shutdown calls * auto format from compile * restructure the ProducerConf into its own file and spec * further refactoring of Producer to use ProducerConf * tests pass - ish * improve test exception for manager * Fix Intermittent failing test - #10 * refactored producer to remove pointless trait, updated readme and specs * removed unused config * fix scalafmt * address potters comments * address formatting issues * added ConsumerConfig test plus missing consumer fields * improved thread reference docs * removed comment --- README.md | 9 +- .../SimpleKinesisProducer.scala | 4 +- src/main/resources/reference.conf | 37 ++- ...roducerKPL.scala => KinesisProducer.scala} | 138 +++++---- .../producer/KinesisProducerActor.scala | 106 +------ .../kinesis/producer/ProducerConf.scala | 144 ++++++++++ .../ConsumerProcessingManagerSpec.scala | 11 +- .../consumer/KinesisConsumerSpec.scala | 271 ++++++++++++++---- .../producer/KinesisProducerActorSpec.scala | 71 +---- ...PLSpec.scala => KinesisProducerSpec.scala} | 57 ++-- .../kinesis/producer/ProducerConfSpec.scala | 254 ++++++++++++++++ 11 files changed, 776 insertions(+), 326 deletions(-) rename src/main/scala/com/weightwatchers/reactive/kinesis/producer/{KinesisProducerKPL.scala => KinesisProducer.scala} (78%) create mode 100644 src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala rename src/test/scala/com/weightwatchers/reactive/kinesis/producer/{KinesisProducerKPLSpec.scala => KinesisProducerSpec.scala} (76%) create mode 100644 src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala diff --git a/README.md b/README.md index 4a6fb07..85fa918 100644 --- a/README.md +++ b/README.md @@ -410,22 +410,21 @@ kpa ! Send(producerEvent) //Send without a callback confirmation ### Pure Scala based implementation (simple wrapper around KPL) -*Note that throttling will be unavailable using this method.* +*Note that future throttling will be unavailable using this method.* ```scala import java.util.UUID import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult} +import com.weightwatchers.reactive.kinesis.producer.KinesisProducer +import com.weightwatchers.reactive.kinesis.producer.ProducerConf import com.typesafe.config._ import com.weightwatchers.reactive.kinesis.models._ -import com.weightwatchers.reactive.kinesis.producer.KinesisProducerKPL import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global //Not for production val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis") -val producerConfig: Config = kinesisConfig.getConfig("some-producer") -val streamName: String = producerConfig.getString("stream-name") -val kpl = KinesisProducerKPL(kinesisConfig.getConfig("kpl"), streamName) +val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer")) val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}") diff --git a/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisProducer.scala b/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisProducer.scala index 0b15669..83f9f93 100644 --- a/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisProducer.scala +++ b/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisProducer.scala @@ -12,7 +12,7 @@ import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{ SendSuccessful, SendWithCallback } -import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducerKPL} +import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducer} import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global @@ -62,7 +62,7 @@ class SimpleKinesisProducer(kConfig: Config) extends Actor with LazyLogging { //We're creating the producer the hard way to get access to the underlying KPL val kpaProps = KinesisProducerActor.props(kinesisConfig, "testProducer") val kpa = context.actorOf(kpaProps) - val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducerKPL] + val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducer] /* producer without actor: val producerConfig = kinesisConfig.getConfig("testProducer") diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index eee59b2..393f0f5 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -166,7 +166,7 @@ kinesis { # Default: 443 # Minimum: 1 # Maximum (inclusive): 65535 - #KinesisPort = 443 + # KinesisPort = 443 # If true, throttled puts are not retried. The records that got throttled # will be failed immediately upon receiving the throttling error. This is @@ -379,12 +379,18 @@ kinesis { # Sets the threading model that the native process will use. # Enum: - # ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request. - # ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize - # Default = ThreadingModel.PER_REQUEST + # PER_REQUEST: Tells the native process to create a thread for each request. + # Under heavy load this can create a very large number of threads, which may cause resource exhaustion. + # POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize + # This uses a queue, and thread pool to execute requests to Kinesis. + # This limits the number of threads that the native process may use. + # Under extremely heavy load this can increase latency significantly more than the per request model. + # + # Default = PER_REQUEST # ThreadingModel = # Sets the maximum number of threads that the native process' thread pool will be configured with. + # # Default: 0 # ThreadPoolSize = } @@ -527,11 +533,11 @@ kinesis { # http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3912 # # Default: null - #kinesisEndpoint = https://kinesis + #kinesisEndpoint = "https://kinesis" # DynamoDB endpoint # Default: null - #DynamoDBEndpoint = + #dynamoDBEndpoint = "https://dynamo" # Don't call processRecords() on the record processor for empty record lists. # Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while) @@ -582,6 +588,13 @@ kinesis { # Default: Operation, ShardId #metricsEnabledDimensions = Operation, ShardId + # Sets the max size of the thread pool that will be used to renew leases. + # Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. + # + # Min: 2 + # Default: 20 + #maxLeaseRenewalThreads=20 + # The max number of leases (shards) this worker should process. # This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints @@ -628,7 +641,17 @@ kinesis { # TableName name of the lease table in DynamoDB # Default = - #TableName = + #tableName = + + # A timeout when dispatching records to the client MultiLang record processor. + # If the record processor doesn't respond within the timeout the parent Java process will be terminated. + # This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor. + # Setting this can cause the KCL to exit suddenly, + # before using this ensure that you have an automated restart for your application + # + # Default: no timeout + #timeoutInSeconds = + } } } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPL.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducer.scala similarity index 78% rename from src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPL.scala rename to src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducer.scala index 2432013..f193e8a 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPL.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducer.scala @@ -29,64 +29,7 @@ import com.weightwatchers.reactive.kinesis.utils.{FutureUtils, TypesafeConfigExt import scala.concurrent.{ExecutionContextExecutor, Future} -trait KinesisProducer { - - /** - * Adds a message to the next batch to be sent to the configured stream. - * - * @return On success: Future{UserRecordResult} - * On failure: Future.failed(...): Any Throwable related to put. - * @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord` - * @see UserRecordResult - * @see KinesisProducerConfiguration#setRecordTtl(long) - * @see UserRecordFailedException - */ - def addUserRecord(event: ProducerEvent)( - implicit ec: ExecutionContextExecutor - ): Future[UserRecordResult] - - /** - * Get the number of unfinished records currently being processed. The - * records could either be waiting to be sent to the child process, or have - * reached the child process and are being worked on. - * - *

- * This is equal to the number of futures returned from [[addUserRecord]] - * that have not finished. - * - * This is useful for applying backpressure and throttling the number of concurrent Futures. - * - * @return The number of unfinished records currently being processed. - */ - def outstandingRecordsCount(): Int - - /** - * Firstly, blocks whilst all all records are complete (either succeeding or failing). - * - *

- * - * The includes whilst any retries are performed. Depending on - * your configuration of record TTL and request timeout, this can - * potentially take a long time if the library is having trouble delivering - * records to the backend, for example due to network problems. - * - *

- * - * Finally the [[KinesisProducer]] is destroyed, preventing further use. - * - * @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better? - * @see [[AWSKinesisProducer]] - */ - def stop(): Unit - - /** - * @return true if the [[KinesisProducer]] has been stopped & destroyed. - */ - def destroyed(): Boolean - -} - -object KinesisProducerKPL extends LazyLogging { +object KinesisProducer extends LazyLogging { /** * The config passed is expected to contain the AWS KPL properties at the top level. @@ -100,9 +43,11 @@ object KinesisProducerKPL extends LazyLogging { * @param credentialsProvider A specific CredentialsProvider. The KCL defaults to DefaultAWSCredentialsProviderChain. * @return an instantiated [[KinesisProducer]] */ + @deprecated("Use KinesisProducer(producerConf: ProducerConf) instead", "v0.5.7") def apply(kplConfig: Config, streamName: String, credentialsProvider: Option[AWSCredentialsProvider] = None): KinesisProducer = { + import TypesafeConfigExtensions._ // We directly load our properties into the KPL as a Java `Properties` object @@ -117,7 +62,32 @@ object KinesisProducerKPL extends LazyLogging { KinesisProducerConfiguration.fromProperties(kplProps) credentialsProvider.foreach(kplLibConfiguration.setCredentialsProvider) - new KinesisProducerKPL(new AWSKinesisProducer(kplLibConfiguration), streamName) + new KinesisProducer(new AWSKinesisProducer(kplLibConfiguration), streamName) + } + + /** + * The config passed is expected to contain the AWS KPL properties at the top level. + * + * @param producerConf An instance of [[ProducerConf]] which contains all required configuration for the KPL. + * @return an instantiated [[KinesisProducer]] + */ + def apply(producerConf: ProducerConf): KinesisProducer = { + apply(producerConf.kplLibConfiguration, producerConf.streamName) + } + + /** + * The [[KinesisProducerConfiguration]] argument is passed directly to the KPL library. + * This constructor makes no use of the Typesafe config. + * + * @see `src/it/resources/reference.conf` for a more detailed example. + * @param kplConfig An instance of the underlying [[KinesisProducerConfiguration]] to be passed + * directly to the library. + * @param streamName Th name of the Kinesis stream, which must exist. + * @return an instantiated [[KinesisProducer]] + */ + def apply(kplConfig: KinesisProducerConfiguration, streamName: String): KinesisProducer = { + //TODO add logging + new KinesisProducer(new AWSKinesisProducer(kplConfig), streamName) } } @@ -126,9 +96,7 @@ object KinesisProducerKPL extends LazyLogging { * * To create an instance of this class, we recommend using the apply method to instantiate from config. */ -class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String) - extends LazyLogging - with KinesisProducer { +class KinesisProducer(kinesis: AWSKinesisProducer, streamName: String) extends LazyLogging { val underlying = kinesis private var _destroyed = false @@ -137,9 +105,16 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String) //TODO seems difficult to get access to stream specific operations from producer /** - * @see [[KinesisProducer]].addUserRecord + * Adds a message to the next batch to be sent to the configured stream. + * + * @return On success: Future{UserRecordResult} + * On failure: Future.failed(...): Any Throwable related to put. + * @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord` + * @see UserRecordResult + * @see KinesisProducerConfiguration#setRecordTtl(long) + * @see UserRecordFailedException */ - override def addUserRecord( + def addUserRecord( event: ProducerEvent )(implicit ec: ExecutionContextExecutor): Future[UserRecordResult] = { assert(!_destroyed, "Kinesis has been destroyed, no longer accepting messages") //TODO specific exception? @@ -148,24 +123,47 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String) } /** - * @see [[KinesisProducer]].outstandingRecordsCount() + * Get the number of unfinished records currently being processed. The + * records could either be waiting to be sent to the child process, or have + * reached the child process and are being worked on. + * + *

+ * This is equal to the number of futures returned from [[addUserRecord]] + * that have not finished. + * + * This is useful for applying backpressure and throttling the number of concurrent Futures. + * + * @return The number of unfinished records currently being processed. */ - override def outstandingRecordsCount(): Int = { + def outstandingRecordsCount(): Int = { kinesis.getOutstandingRecordsCount } /** - * @see [[KinesisProducer]].stop()sbt publish + * Firstly, blocks whilst all all records are complete (either succeeding or failing). + * + *

* + * The includes whilst any retries are performed. Depending on + * your configuration of record TTL and request timeout, this can + * potentially take a long time if the library is having trouble delivering + * records to the backend, for example due to network problems. + * + *

+ * + * Finally the [[KinesisProducer]] is destroyed, preventing further use. + * + * @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better? + * @see [[AWSKinesisProducer]] */ - override def stop(): Unit = { + def stop(): Unit = { kinesis.flushSync() //This blocks until all records are flushed kinesis.destroy() _destroyed = true } /** - * @see [[KinesisProducer]]destroyed() + * @return true if the [[KinesisProducer]] has been stopped & destroyed. */ - override def destroyed(): Boolean = _destroyed + def destroyed(): Boolean = _destroyed } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActor.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActor.scala index 2559738..69693aa 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActor.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActor.scala @@ -26,93 +26,12 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import com.weightwatchers.reactive.kinesis.models.ProducerEvent import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor._ +import com.weightwatchers.reactive.kinesis.producer.ProducerConf.ThrottlingConf import scala.concurrent.Future -import scala.concurrent.duration._ object KinesisProducerActor { - /** - * Compainion object for the [[ProducerConf]]. - */ - object ProducerConf { - - /** - * Given the `kinesisConfig`, builds a combined configuration by taking the `producerName` specific configuration - * within, and using the `default-producer` configuration as a fallback for all values. - * - * @param kinesisConfig The top level Kinesis Configuration, containing the specified producer. - * @param producerName The name of the producer, which MUST be contained within the `kinesisConfig` - * @return A [[ProducerConf]] case class used for constructing the [[KinesisProducerActor]] - */ - def apply(kinesisConfig: Config, producerName: String): ProducerConf = { - - val producerConfig = kinesisConfig - .getConfig(producerName) - .withFallback(kinesisConfig.getConfig("default-producer")) - - val streamName = producerConfig.getString("stream-name") - require(!streamName.isEmpty, - "A valid stream name must be provided to start the Kinesis Producer") - - val dispatcher: Option[String] = - if (producerConfig.getIsNull("akka.dispatcher")) - None - else { - val dispatcherProp = producerConfig.getString("akka.dispatcher") - if (dispatcherProp.isEmpty) - None - else - Some(dispatcherProp) - } - - new ProducerConf(streamName, - producerConfig.getConfig("kpl"), - dispatcher, - parseThrottlingConfig(producerConfig)) - } - - private def parseThrottlingConfig(producerConfig: Config): Option[ThrottlingConf] = { - if (!producerConfig.hasPath("akka.max-outstanding-requests") - || producerConfig.getIsNull("akka.max-outstanding-requests")) - None - else { - val maxOutstandingRequests = producerConfig.getInt("akka.max-outstanding-requests") - - if (!producerConfig.hasPath("akka.throttling-retry-millis") - || producerConfig.getIsNull("akka.throttling-retry-millis")) - Some(ThrottlingConf(maxOutstandingRequests)) - else - Some( - ThrottlingConf(maxOutstandingRequests, - producerConfig.getLong("akka.throttling-retry-millis").millis) - ) - } - } - } - - /** - * The collection of configuration values required for constructing a producer. See the companion object. - * - * @param streamName The name of the Kinesis Stream this producer will publish to. - * @param kplConfig The `kpl` section of the kinesis configuration. - * @param dispatcher An optional dispatcher for the producer and kpl. - * @param throttlingConf Configuration which defines whether and how often to throttle. - */ - final case class ProducerConf(streamName: String, - kplConfig: Config, - dispatcher: Option[String] = None, - throttlingConf: Option[ThrottlingConf] = None) - - /** - * Configuration which defines whether and how often to throttle. - * - * @param maxOutstandingRequests The max number of concurrent requests before throttling. None removes throttling completely. - * @param retryDuration The time before retrying after throttling. - */ - protected final case class ThrottlingConf(maxOutstandingRequests: Int, - retryDuration: FiniteDuration = 100.millis) - private val UUID_GENERATOR = Generators.timeBasedGenerator() /** @@ -146,7 +65,7 @@ object KinesisProducerActor { private case object UnThrottle /** - * Create a [[KinesisProducerKPL]] and passes it to a [[KinesisProducerActor]], returning the Props. + * Create a [[KinesisProducer]] and passes it to a [[KinesisProducerActor]], returning the Props. * * This function will attempt to load config (per value) from the `producerName` section within `kinesisConfig`. * @@ -164,20 +83,17 @@ object KinesisProducerActor { def props(kinesisConfig: Config, producerName: String, credentialsProvider: Option[AWSCredentialsProvider] = None): Props = { - val producerConf = ProducerConf(kinesisConfig, producerName) - props(producerConf, credentialsProvider) + props(ProducerConf(kinesisConfig, producerName, credentialsProvider)) } /** - * Create a [[KinesisProducerKPL]] and passes it to a [[KinesisProducerActor]], returning the Props. + * Create a [[KinesisProducer]] and passes it to a [[KinesisProducerActor]], returning the Props. * - * @param producerConf A complete [[ProducerConf]] case class. - * @param credentialsProvider A specific CredentialsProvider. The KCL defaults to DefaultAWSCredentialsProviderChain. + * @param producerConf A complete [[ProducerConf]] case class. */ - def props(producerConf: ProducerConf, - credentialsProvider: Option[AWSCredentialsProvider]): Props = { + def props(producerConf: ProducerConf): Props = { val kinesisProducer = - KinesisProducerKPL(producerConf.kplConfig, producerConf.streamName, credentialsProvider) + KinesisProducer(producerConf) val props = Props(classOf[KinesisProducerActor], kinesisProducer, producerConf.throttlingConf) producerConf.dispatcher.fold(props)(props.withDispatcher) @@ -193,7 +109,7 @@ object KinesisProducerActor { } /** - * This ``Actor`` wraps the [[KinesisProducerKPL]] to provide reliable handling and throttling of requests. + * This ``Actor`` wraps the [[KinesisProducer]] to provide reliable handling and throttling of requests. * * Upon completion of a [[com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.SendWithCallback]], * a [[com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.SendSuccessful]] @@ -258,9 +174,9 @@ class KinesisProducerActor(producer: KinesisProducer, throttlingConfig: Option[T //TODO is this too much log output on error? I'm assuming this will be rare! import scala.collection.JavaConverters._ val errorList = ex.getResult.getAttempts.asScala.map(attempt => s""" - |Delay after prev attempt: ${attempt.getDelay} ms, - |Duration: ${attempt.getDuration} ms, Code: ${attempt.getErrorCode}, - |Message: ${attempt.getErrorMessage} + |Delay after prev attempt: ${attempt.getDelay} ms, + |Duration: ${attempt.getDuration} ms, Code: ${attempt.getErrorCode}, + |Message: ${attempt.getErrorMessage} """.stripMargin) logger.warn( s"Record failed to put, partitionKey=${event.partitionKey}, payload=${event.payload}, attempts:$errorList", diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala new file mode 100644 index 0000000..19a02f5 --- /dev/null +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala @@ -0,0 +1,144 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.producer + +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel +import com.typesafe.config.Config +import com.weightwatchers.reactive.kinesis.producer.ProducerConf.ThrottlingConf +import com.weightwatchers.reactive.kinesis.utils.TypesafeConfigExtensions + +import scala.concurrent.duration.{FiniteDuration, _} + +/** + * Companion object for the [[ProducerConf]]. + */ +object ProducerConf { + + /** + * Configuration which defines whether and how often to throttle. + * Only applicable when using the actor interface. + * + * @param maxOutstandingRequests The max number of concurrent requests before throttling. None removes throttling completely. + * @param retryDuration The time before retrying after throttling. + */ + final case class ThrottlingConf(maxOutstandingRequests: Int, + retryDuration: FiniteDuration = 100.millis) + + /** + * Given the top level `kinesis` config block, builds a combined configuration by taking the `producerName` specific configuration + * within, and using the `default-producer` configuration as a fallback for all values. + * + * @see `src/it/resources/reference.conf` for a more detailed example of the KinesisConfig. + * @param kinesisConfig The top level Kinesis Configuration, containing the specified producer. + * @param producerName The name of the producer, which MUST be contained within the `kinesisConfig` + * @param credentialsProvider A specific CredentialsProvider. The KPL defaults to [[com.amazonaws.auth.DefaultAWSCredentialsProviderChain]]. + * @return A [[ProducerConf]] case class used for constructing the [[KinesisProducerActor]] + */ + def apply(kinesisConfig: Config, + producerName: String, + credentialsProvider: Option[AWSCredentialsProvider] = None): ProducerConf = { + + val producerConfig = kinesisConfig + .getConfig(producerName) + .withFallback(kinesisConfig.getConfig("default-producer")) + + val streamName = producerConfig.getString("stream-name") + require( + !streamName.isEmpty, + s"Config field `stream-name` missing, a value must be provided to start the Kinesis Producer!" + ) + + val dispatcher: Option[String] = + if (producerConfig.getIsNull("akka.dispatcher")) + None + else { + val dispatcherProp = producerConfig.getString("akka.dispatcher") + if (dispatcherProp.isEmpty) + None + else + Some(dispatcherProp) + } + + val kplConfig = producerConfig.getConfig("kpl") + val kplLibConfiguration: KinesisProducerConfiguration = + buildKPLConfig(kplConfig, credentialsProvider) + + new ProducerConf(streamName, + kplLibConfiguration, + dispatcher, + parseThrottlingConfig(producerConfig)) + } + + private def buildKPLConfig(kplConfig: Config, + credentialsProvider: Option[AWSCredentialsProvider]) = { + // We directly load our properties into the KPL as a Java `Properties` object + // See http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html + import TypesafeConfigExtensions._ + val kplProps = kplConfig.toProperties + + val kplLibConfiguration: KinesisProducerConfiguration = + KinesisProducerConfiguration.fromProperties(kplProps) + credentialsProvider.foreach(kplLibConfiguration.setCredentialsProvider) + + //TODO, this should be part of the KPL. The KCL would handle enums and ints and let us use props directly. + //TODO can be removed once this is merged: https://github.com/awslabs/amazon-kinesis-producer/pull/134 + if (kplConfig.hasPath("ThreadingModel")) { + kplLibConfiguration.setThreadingModel( + ThreadingModel.valueOf(kplConfig.getString("ThreadingModel")) + ) + } + if (kplConfig.hasPath("ThreadPoolSize")) { + kplLibConfiguration.setThreadPoolSize(kplConfig.getInt("ThreadPoolSize")) + } + + kplLibConfiguration + } + + private def parseThrottlingConfig(producerConfig: Config): Option[ThrottlingConf] = { + if (!producerConfig.hasPath("akka.max-outstanding-requests") + || producerConfig.getIsNull("akka.max-outstanding-requests")) + None + else { + val maxOutstandingRequests = producerConfig.getInt("akka.max-outstanding-requests") + + if (!producerConfig.hasPath("akka.throttling-retry-millis") + || producerConfig.getIsNull("akka.throttling-retry-millis")) + Some(ThrottlingConf(maxOutstandingRequests)) + else + Some( + ThrottlingConf(maxOutstandingRequests, + producerConfig.getLong("akka.throttling-retry-millis").millis) + ) + } + } + +} + +/** + * The collection of configuration values required for constructing a producer. See the companion object. + * + * @param streamName The name of the Kinesis Stream this producer will publish to. + * @param kplLibConfiguration An instance of the underlying [[KinesisProducerConfiguration]] for the KPL library. + * @param dispatcher An optional dispatcher for the producer and kpl. + * @param throttlingConf Configuration which defines whether and how often to throttle. + */ +final case class ProducerConf(streamName: String, + kplLibConfiguration: KinesisProducerConfiguration, + dispatcher: Option[String], + throttlingConf: Option[ThrottlingConf]) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManagerSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManagerSpec.scala index 9f7a4db..f959029 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManagerSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManagerSpec.scala @@ -68,7 +68,7 @@ class ConsumerProcessingManagerSpec Await.result(system.whenTerminated, 5.seconds) } - "The KinesisRecordProcessingManager" - { + "The ConsumerProcessingManager" - { "Should set the shardId on init" in { val worker = TestProbe() val kcl = mock[Worker] @@ -151,9 +151,9 @@ class ConsumerProcessingManagerSpec "When the response is a failed batch it should shutdown and stop processing" in new ProcessingSetup { - workerResponse.success(ProcessingComplete(false)) //complete with a failed batch - whenReady(processResult) { _ => + workerResponse.success(ProcessingComplete(false)) //complete with a failed batch + processResult.isCompleted should be(true) manager.shuttingDown.get() should be(true) @@ -166,7 +166,10 @@ class ConsumerProcessingManagerSpec "When the response is a failed (exception) future it should shutdown and stop processing" in new ProcessingSetup { - workerResponse.failure(new NullPointerException("TEST")) //complete with an exception + val ex = new Exception("TEST") + ex.setStackTrace(Array.empty[StackTraceElement]) + + workerResponse.failure(ex) //complete with an exception whenReady(processResult) { _ => processResult.isCompleted should be(true) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala index 9616d64..b5e918e 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala @@ -59,52 +59,157 @@ class KinesisConsumerSpec val kinesisConfig = ConfigFactory .parseString(""" - |kinesis { - | - | application-name = "TestSpec" - | - | testConsumer-1 { - | stream-name = "test-kinesis-reliability" - | - | worker { - | batchTimeoutSeconds = 1234 - | gracefulShutdownHook = false - | shutdownTimeoutSeconds = 2 - | } - | - | checkpointer { - | backoffMillis = 4321 - | } - | - | kcl { - | AWSCredentialsProvider = EnvironmentVariableCredentialsProvider - | regionName = us-east-1 - | KinesisEndpoint = "CustomKinesisEndpoint" - | DynamoDBEndpoint = "CustomDynamoDBEndpoint" - | SkipShardSyncAtStartupIfLeasesExist = true - | TableName = "TableName" - | } - | } - | - | testConsumer-2 { - | stream-name = "some-other-stream" - | - | worker { - | failedMessageRetries = 3 - | gracefulShutdownHook = false - | } - | - | checkpointer { - | backoffMillis = 111 - | } - | - | kcl { - | AWSCredentialsProvider = DefaultAWSCredentialsProviderChain - | regionName = us-east-2 - | } - | } - |} - """.stripMargin) + |kinesis { + | + | application-name = "TestSpec" + | + | testConsumer-1 { + | stream-name = "test-kinesis-reliability" + | + | worker { + | batchTimeoutSeconds = 1234 + | gracefulShutdownHook = false + | shutdownTimeoutSeconds = 2 + | } + | + | checkpointer { + | backoffMillis = 4321 + | } + | + | kcl { + | AWSCredentialsProvider = EnvironmentVariableCredentialsProvider + | regionName = us-east-1 + | KinesisEndpoint = "CustomKinesisEndpoint" + | DynamoDBEndpoint = "CustomDynamoDBEndpoint" + | SkipShardSyncAtStartupIfLeasesExist = true + | TableName = "TableName" + | } + | } + | + | testConsumer-2 { + | stream-name = "some-other-stream" + | + | worker { + | failedMessageRetries = 3 + | gracefulShutdownHook = false + | } + | + | checkpointer { + | backoffMillis = 111 + | } + | + | kcl { + | AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + | regionName = us-east-2 + | } + | } + | + | testConsumer-3 { + | stream-name = "some-other-stream" + | + | worker { + | batchTimeoutSeconds = 1234 + | gracefulShutdownHook = false + | shutdownTimeoutSeconds = 2 + | } + | + | checkpointer { + | backoffMillis = 4321 + | } + | + | kcl { + | AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + | + | regionName = us-east-2 + | + | # Default: LATEST + | initialPositionInStream = TRIM_HORIZON + | + | # Default = 10000 + | maxRecords = 20000 + | + | # Default = 1000 + | idleTimeBetweenReadsInMillis = 1234 + | + | # Default: 10000 + | failoverTimeMillis = 11000 + | + | # Default: 60000 + | shardSyncIntervalMillis = 70000 + | + | # Default: true + | cleanupLeasesUponShardCompletion = false + | + | # Default: true + | validateSequenceNumberBeforeCheckpointing = false + | + | # Default: null + | kinesisEndpoint = "https://kinesis" + | + | # Default: null + | dynamoDBEndpoint = "https://dynamo" + | + | # Default: false + | callProcessRecordsEvenForEmptyRecordList = true + | + | # Default: 10000 + | parentShardPollIntervalMillis = 40000 + | + | # Default: 500 + | taskBackoffTimeMillis = 600 + | + | # Default: 10000 + | metricsBufferTimeMillis = 10001 + | + | + | # Default: 10000 + | metricsMaxQueueSize = 10009 + | + | + | # Default: DETAILED + | metricsLevel = NONE + | + | + | # Default: Operation, ShardId + | metricsEnabledDimensions = Operation + | + | + | # Default: 2147483647 (Integer.MAX_VALUE) + | maxLeasesForWorker = 11111111 + | + | + | # Default: 1 + | maxLeasesToStealAtOneTime = 2 + | + | + | # Default: 10 + | initialLeaseTableReadCapacity = 15 + | + | + | # Default: 10 + | initialLeaseTableWriteCapacity = 14 + | + | # Default: false + | skipShardSyncAtStartupIfLeasesExist=true + | + | + | # Default: + | userAgent = testy123 + | + | # Default = + | tableName = meh + | + | # Default: 20 + | maxLeaseRenewalThreads=9 + | + | + | # Default: no timeout + | timeoutInSeconds = 10 + | } + | + | } + |} + """.stripMargin) .getConfig("kinesis") .withFallback(defaultKinesisConfig) @@ -120,7 +225,7 @@ class KinesisConsumerSpec .createProcessor() shouldBe a[ConsumerProcessingManager] } - def assertConsumer1(): Assertion = { + def assertConsumer1Config(): Assertion = { val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-1") consumerConf.workerConf.batchTimeout should be(1234.seconds) @@ -157,11 +262,77 @@ class KinesisConsumerSpec } "Should parse the Config into a ConsumerConf for a single consumer" in { - assertConsumer1() + assertConsumer1Config() + } + + "Should parse consumer 3 the Config into a ConsumerConf, setting all properties in the KinesisClientLibConfiguration" in { + //This will fail when fields are added or renamed in the KCL + + // Some setters don't match the field names. + val confToFieldConversions = Map( + "skipShardSyncAtStartupIfLeasesExist" -> "skipShardSyncAtWorkerInitializationIfLeasesExist" + ) + + val fieldsToSkip = List( + "useragent", //this gets nested internally + "streamname", + "timestampatinitialpositioninstream", + "commonclientconfig", + "shardprioritizationstrategy", + "kinesisclientconfig", + "dynamodbclientconfig", + "cloudwatchclientconfig", + "credentialsprovider", //these must be tested individually + "applicationname" + ) + + val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-3") + + consumerConf.workerConf.batchTimeout should be(1234.seconds) + consumerConf.workerConf.failedMessageRetries should be(1) + consumerConf.workerConf.failureTolerancePercentage should be(0.25) + consumerConf.workerConf.shutdownHook should be(false) + consumerConf.workerConf.shutdownTimeout should be(Timeout(2.seconds)) + consumerConf.checkpointerConf.backoff should be(4321.millis) + consumerConf.checkpointerConf.interval should be(2000.millis) //reference default + consumerConf.checkpointerConf.notificationDelay should be(1000.millis) //reference default + consumerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher")) + consumerConf.kclConfiguration.getApplicationName should be( + "TestSpec-some-other-stream" + ) + + val kclConfig = kinesisConfig.getConfig("testConsumer-3.kcl") + val kclLibConfiguration = consumerConf.kclConfiguration + + //We're dealing with Java classes so using Java reflection is cleaner here + //Start with the setters to prevent picking up all the unrelated private fields, stripping the "with" + val configKeys = kclLibConfiguration.getClass.getDeclaredMethods + .filter(_.getName.startsWith("with")) + .map(_.getName.drop(4)) + .map(field => field.head.toLower + field.tail) + .filterNot( + field => fieldsToSkip.contains(field.toLowerCase) + ) + + configKeys foreach { configKey => + val field = + kclLibConfiguration.getClass.getDeclaredField( + confToFieldConversions.getOrElse(configKey, configKey) + ) + field.setAccessible(true) + + withClue( + s"Property `$configKey` was not as expected when asserting the KCL configuration: " + ) { + kclConfig.hasPath(configKey) should be(true) + field.get(kclLibConfiguration).toString should include(kclConfig.getString(configKey)) + } + } + } "Should parse the Config into multiple ConsumerConf objects for multiple consumers" in { - assertConsumer1() + assertConsumer1Config() val consumerConf2 = ConsumerConf(kinesisConfig, "testConsumer-2") @@ -252,7 +423,7 @@ class KinesisConsumerSpec } Given("A Worker which throws an Exception") - val exception = new RuntimeException() + val exception = new RuntimeException("TEST") Mockito.when(worker.run()).thenThrow(exception) When("We start the Consumer") diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActorSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActorSpec.scala index f118a5c..6bfea5d 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActorSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerActorSpec.scala @@ -16,21 +16,18 @@ package com.weightwatchers.reactive.kinesis.producer -import java.io.File - import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe} import akka.util.Timeout import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult} -import com.typesafe.config.ConfigFactory import com.weightwatchers.reactive.kinesis.models.ProducerEvent import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor._ import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} -import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} //scalastyle:off magic.number class KinesisProducerActorSpec @@ -41,41 +38,6 @@ class KinesisProducerActorSpec with MockitoSugar with BeforeAndAfterAll { - val defaultKinesisConfig = - ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis") - - val kinesisConfig = ConfigFactory - .parseString(""" - |kinesis { - | - | application-name = "TestSpec" - | - | testProducer { - | stream-name = "core-test-kinesis-producer" - | - | akka { - | max-outstanding-requests = 50000 - | } - | - | kpl { - | Region = us-east-1 - | KinesisEndpoint = "CustomKinesisEndpoint" - | KinesisPort = 1111 - | CredentialsRefreshDelay = 5001 - | CloudwatchEndpoint = "CustomCloudWatchEndpoint" - | CloudwatchPort = 2222 - | EnableCoreDumps = true - | NativeExecutable = "NativeExecutable" - | TempDirectory = "TempDirectory" - | ThreadPoolSize = 1 - | ThreadingModel = "ThreadingModel.POOLED" - | } - | } - |} - """.stripMargin) - .getConfig("kinesis") - .withFallback(defaultKinesisConfig) - implicit val timeout = Timeout(5.seconds) import system.dispatcher @@ -87,33 +49,12 @@ class KinesisProducerActorSpec "The KinesisProducerActor" - { - "Should parse the Config into a ProducerConf" in { - val producerConf = ProducerConf(kinesisConfig, "testProducer") - - producerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher")) - producerConf.kplConfig.getString("Region") should be("us-east-1") //validate an override properly - producerConf.kplConfig.getBoolean("AggregationEnabled") should be(true) //validate a default property - producerConf.kplConfig.getString("KinesisEndpoint") should be("CustomKinesisEndpoint") //validate an override property - producerConf.kplConfig.getLong("KinesisPort") should be(1111) //validate an override property - producerConf.kplConfig.getLong("CredentialsRefreshDelay") should be(5001) //validate an override property - producerConf.kplConfig.getString("CloudwatchEndpoint") should be("CustomCloudWatchEndpoint") //validate an override property - producerConf.kplConfig.getLong("CloudwatchPort") should be(2222) //validate an override property - producerConf.kplConfig.getBoolean("EnableCoreDumps") should be(true) //validate an override property - producerConf.kplConfig.getString("NativeExecutable") should be("NativeExecutable") //validate an override property - producerConf.kplConfig.getString("TempDirectory") should be("TempDirectory") //validate an override property - producerConf.kplConfig.getString("ThreadingModel") should be("ThreadingModel.POOLED") //validate an override property - producerConf.kplConfig.getInt("ThreadPoolSize") should be(1) //validate an override property - producerConf.throttlingConf.get.maxOutstandingRequests should be(50000) - producerConf.throttlingConf.get.retryDuration should be(100.millis) - producerConf.streamName should be("core-test-kinesis-producer") - } - "Should process a message without a response for a Send" in { val probe = TestProbe() val event = ProducerEvent("111", "das payload") val result = mock[UserRecordResult] - val producer = mock[KinesisProducerKPL] + val producer = mock[KinesisProducer] //Given a successful response from the underlying producer when(producer.addUserRecord(event)).thenReturn(Future { @@ -135,7 +76,7 @@ class KinesisProducerActorSpec val event = ProducerEvent("111", "das payload") val result = mock[UserRecordResult] - val producer = mock[KinesisProducerKPL] + val producer = mock[KinesisProducer] //Given a successful response from the underlying producer when(producer.addUserRecord(event)).thenReturn(Future { @@ -161,7 +102,7 @@ class KinesisProducerActorSpec val ex = new UserRecordFailedException(result) //Given an exception from the underlying producer - val producer = mock[KinesisProducerKPL] + val producer = mock[KinesisProducer] when(producer.addUserRecord(event)).thenReturn(Future { throw ex }) @@ -186,7 +127,7 @@ class KinesisProducerActorSpec val result = mock[UserRecordResult] //Given that the number of requests in progress >= maxOutstandingRequests - val producer = mock[KinesisProducerKPL] + val producer = mock[KinesisProducer] when(producer.addUserRecord(event)).thenReturn(Future { result }) @@ -217,7 +158,7 @@ class KinesisProducerActorSpec } "Should gracefully shutdown the underlying producer" in { - val producer = mock[KinesisProducerKPL] + val producer = mock[KinesisProducer] when(producer.outstandingRecordsCount()).thenReturn(0) //When we send a SendWithCallback diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPLSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerSpec.scala similarity index 76% rename from src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPLSpec.scala rename to src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerSpec.scala index 42a68f3..cae27de 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerKPLSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerSpec.scala @@ -16,6 +16,8 @@ package com.weightwatchers.reactive.kinesis.producer +import java.io.File + import com.amazonaws.services.kinesis.producer.{ UserRecordResult, KinesisProducer => AWSKinesisProducer @@ -31,46 +33,45 @@ import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers} import scala.concurrent.Future //scalastyle:off magic.number -class KinesisProducerKPLSpec - extends FreeSpec - with Matchers - with MockitoSugar - with BeforeAndAfterAll { +class KinesisProducerSpec extends FreeSpec with Matchers with MockitoSugar with BeforeAndAfterAll { implicit val ece = scala.concurrent.ExecutionContext.global + val defaultKinesisConfig = + ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis") + val kinesisConfig = ConfigFactory .parseString( """ - |kinesis { - | - | application-name = "TestSpec" - | - | testProducer { - | # The name of the producer stream - | stream-name = "core-test-kinesis-producer" - | - | # Can specify settings here as per default-producer, to override those defaults for this producer. - | - | kpl { - | Region = us-east-1 - | } - | } - |} - """.stripMargin + |kinesis { + | + | application-name = "TestSpec" + | + | testProducer { + | # The name of the producer stream + | stream-name = "core-test-kinesis-producer" + | + | # Can specify settings here as per default-producer, to override those defaults for this producer. + | + | kpl { + | Region = us-east-1 + | } + | } + |} + """.stripMargin ) .getConfig("kinesis") + .withFallback(defaultKinesisConfig) "The KinesisProducer" - { "Should create the KinesisProducerKPL with an underlying AWSKinesisProducer" in { - val producer = KinesisProducerKPL( - kinesisConfig.getConfig("testProducer.kpl"), - kinesisConfig.getString("testProducer.stream-name") - ).asInstanceOf[KinesisProducerKPL] + val producer = KinesisProducer(ProducerConf(kinesisConfig, "testProducer")) producer.underlying should not be (null) // scalastyle:ignore + + producer.underlying.destroy() } "Should Add a Record to the Kinesis Stream, wrapping the response in a scala future" in { @@ -78,7 +79,7 @@ class KinesisProducerKPLSpec val streamName = kinesisConfig.getString("testProducer.stream-name") val awsProducer = mock[AWSKinesisProducer] - val scalaProducer = new KinesisProducerKPL(awsProducer, streamName) + val scalaProducer = new KinesisProducer(awsProducer, streamName) val result = mock[UserRecordResult] val event = ProducerEvent("111", "das payload") @@ -107,7 +108,7 @@ class KinesisProducerKPLSpec val streamName = kinesisConfig.getString("testProducer.stream-name") val awsProducer = mock[AWSKinesisProducer] - val scalaProducer = new KinesisProducerKPL(awsProducer, streamName) + val scalaProducer = new KinesisProducer(awsProducer, streamName) //Given a 5 requests in progress when(awsProducer.getOutstandingRecordsCount()).thenReturn(5) @@ -121,7 +122,7 @@ class KinesisProducerKPLSpec val streamName = kinesisConfig.getString("testProducer.stream-name") val awsProducer = mock[AWSKinesisProducer] - val scalaProducer = new KinesisProducerKPL(awsProducer, streamName) + val scalaProducer = new KinesisProducer(awsProducer, streamName) //Given we call stop scalaProducer.stop() diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala new file mode 100644 index 0000000..0407fd0 --- /dev/null +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala @@ -0,0 +1,254 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.producer + +import java.io.File + +import akka.actor.ActorSystem +import akka.testkit.{ImplicitSender, TestKit} +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration._ + +//scalastyle:off magic.number +class ProducerConfSpec + extends TestKit(ActorSystem("producer-spec")) + with ImplicitSender + with FreeSpecLike + with Matchers + with MockitoSugar + with BeforeAndAfterAll { + + val defaultKinesisConfig = + ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis") + + val kinesisConfig = ConfigFactory + .parseString( + """ + |kinesis { + | + | application-name = "TestSpec" + | + | testProducer { + | stream-name = "core-test-kinesis-producer" + | + | akka { + | dispatcher = "kinesis.akka.custom-dispatcher" + | + | max-outstanding-requests = 50000 + | + | throttling-retry-millis = 100 + | } + | + | kpl { + | # Default: true + | AggregationEnabled = false + | + | # Default: 4294967295 + | # Minimum: 1 + | # Maximum (inclusive): 9223372036854775807 + | AggregationMaxCount = 5 + | + | # Default: 51200 + | # Minimum: 64 + | # Maximum (inclusive): 1048576 + | AggregationMaxSize = 77 + | + | # Default: 500 + | # Minimum: 1 + | # Maximum (inclusive): 500 + | CollectionMaxCount = 25 + | + | # Default: 5242880 + | # Minimum: 52224 + | # Maximum (inclusive): 9223372036854775807 + | CollectionMaxSize = 55555 + | + | # Default: 6000 + | # Minimum: 100 + | # Maximum (inclusive): 300000 + | ConnectTimeout = 101 + | + | + | # Default: 5000 + | # Minimum: 1 + | # Maximum (inclusive): 300000 + | CredentialsRefreshDelay = 2400 + | + | # Expected pattern: ^([A-Za-z0-9-\\.]+)?$ + | CloudwatchEndpoint = 127.0.0.1 + | + | # Default: 443 + | # Minimum: 1 + | # Maximum (inclusive): 65535 + | CloudwatchPort = 123 + | + | # Default: false + | EnableCoreDumps = true + | + | # Use a custom Kinesis endpoint. + | # + | # Mostly for testing use. Note this does not accept protocols or paths, only + | # host names or ip addresses. There is no way to disable TLS. The KPL always + | # connects with TLS. + | # + | # Expected pattern: ^([A-Za-z0-9-\\.]+)?$ + | KinesisEndpoint = 172.1.1.1 + | + | # Default: 443 + | # Minimum: 1 + | # Maximum (inclusive): 65535 + | KinesisPort = 666 + | + | # Default: false + | FailIfThrottled = true + | + | # Default: info + | # Expected pattern: info|warning|error + | LogLevel = warning + | + | # Default: 24 + | # Minimum: 1 + | # Maximum (inclusive): 256 + | MaxConnections = 5 + | + | # Default: shard + | # Expected pattern: global|stream|shard + | MetricsGranularity = stream + | + | # Default: detailed + | # Expected pattern: none|summary|detailed + | MetricsLevel = none + | + | # Default: KinesisProducerLibrary + | # Expected pattern: (?!AWS/).{1,255} + | MetricsNamespace = SomeNamespace + | + | # Default: 60000 + | # Minimum: 1 + | # Maximum (inclusive): 60000 + | MetricsUploadDelay = 5000 + | + | # Default: 1 + | # Minimum: 1 + | # Maximum (inclusive): 16 + | MinConnections = 3 + | + | #Path to the native KPL binary. Only use this setting if you want to use a custom build of + | #the native code. + | NativeExecutable=/tmp + | + | # Default: 150 + | # Minimum: 1 + | # Maximum (inclusive): 9223372036854775807 + | RateLimit = 99 + | + | # Default: 100 + | # Maximum (inclusive): 9223372036854775807 + | RecordMaxBufferedTime = 88 + | + | # Default: 30000 + | # Minimum: 100 + | # Maximum (inclusive): 9223372036854775807 + | RecordTtl = 25000 + | + | # Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$ + | Region = "us-east-2" + | + | # Default: 6000 + | # Minimum: 100 + | # Maximum (inclusive): 600000 + | RequestTimeout = 3000 + | + | # If not specified, defaults to /tmp in Unix. (Windows TBD) + | TempDirectory = /tmp + | + | # Default: true + | VerifyCertificate = false + | + | # Enum: + | # PER_REQUEST: Tells the native process to create a thread for each request. + | # POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize + | # Default = PER_REQUEST + | ThreadingModel = POOLED + | + | # Default: 0 + | ThreadPoolSize = 5 + | } + | + | } + |} + """.stripMargin + ) + .getConfig("kinesis") + .withFallback(defaultKinesisConfig) + + implicit val timeout = Timeout(5.seconds) + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, timeout.duration) + } + + "The ProducerConf" - { + + "Should parse the Config into a ProducerConf, setting all properties in the KinesisProducerConfiguration" in { + val producerConf = ProducerConf(kinesisConfig, "testProducer") + + producerConf.streamName should be("core-test-kinesis-producer") + producerConf.dispatcher should be(Some("kinesis.akka.custom-dispatcher")) + producerConf.throttlingConf.get.maxOutstandingRequests should be(50000) + producerConf.throttlingConf.get.retryDuration should be(100.millis) + + val kplConfig = kinesisConfig.getConfig("testProducer.kpl") + + val kplLibConfiguration = producerConf.kplLibConfiguration + kplLibConfiguration.isAggregationEnabled should be(false) + + //We're dealing with Java classes so using Java reflection is cleaner here + //Start with the setters to prevent picking up all the unrelated private fields, stripping the "set" + val configKeys = kplLibConfiguration.getClass.getDeclaredMethods + .filter(_.getName.startsWith("set")) + .map(_.getName.drop(3)) + // TODO we don't yet support setting of credentials providers via config due to KPL limitations + // TODO see issue #29 : https://github.com/WW-Digital/reactive-kinesis/issues/29 + .filterNot(_.toLowerCase.contains("credentialsprovider")) + + configKeys foreach { configKey => + val field = + kplLibConfiguration.getClass.getDeclaredField(configKey.head.toLower + configKey.tail) + field.setAccessible(true) + + withClue( + s"Property `$configKey` was not as expected when asserting the KPL configuration: " + ) { + kplConfig.hasPath(configKey) should be(true) + field.get(kplLibConfiguration).toString should be(kplConfig.getString(configKey)) + } + } + + } + + } + +} + +//scalastyle:on